Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Microservices Authorization using Open Policy Agent and Traefik (API Gateway)

A Study in Serverless Authorization with Open Policy Agent

OpenFaaS OPA

Godaddy Opa Lambda Extension Plugin

Extending OPA

OPA Ecosystem

GO

Go provides a library for opa.

...

Code Block
languagetext
titleGO OPA
package main

import (
        "context"
        "encoding/json"
        "fmt"
        "github.com/open-policy-agent/opa/rego"
        "io/ioutil"
        "net/http"
        "net/url"
        "os"
)

type Jwttoken struct {
        Access_token       string
        Expires_in         int
        Refresh_expires_in int
        Refresh_token      string
        Token_type         string
        Not_before_policy  int
        Session_state      string
        Scope              string
}

var token Jwttoken

var opaPolicy string = `
package authz

import future.keywords.in

default allow = false

jwks := jwks_request("http://keycloak:8080/auth/realms/opa/protocol/openid-connect/certs").body
filtered_jwks := [ key |
      some key in jwks.keys
      key.use == "sig"
    ]
token_cert := json.marshal({"keys": filtered_jwks})

token = { "isValid": isValid, "header": header, "payload": payload } {
        [isValid, header, payload] := io.jwt.decode_verify(input, { "cert": token_cert, "aud": "account", "iss": "http://keycloak:808
0/auth/realms/opa"})
}

allow {
    is_token_valid
}

is_token_valid {
        token.isValid
        now := time.now_ns() / 1000000000
        token.payload.iat <= now
        now < token.payload.exp
        token.payload.clientRole == "[opa-client-role]"
}

jwks_request(url) = http.send({
      "url": url,
      "method": "GET",
      "force_cache": true,
      "force_json_decode": true,
      "force_cache_duration_seconds": 3600 # Cache response for an hour
})
`

func getToken() string {
        clientSecret := "63wkv0RUXkp01pbqtNTSwghhTxeMW55I"
        clientId := "opacli"
        realmName := "opa"
        keycloakHost := "keycloak"
        keycloakPort := "8080"
        keycloakUrl := "http://" + keycloakHost + ":" + keycloakPort + "/auth/realms/" + realmName + "/protocol/openid-connect/token"        resp, err := http.PostForm(keycloakUrl,
                url.Values{"client_secret": {clientSecret}, "grant_type": {"client_credentials"}, "client_id": {clientId}})
        if err != nil {
                fmt.Println(err)
                panic("Something wrong with the credentials or url ")
        }
        defer resp.Body.Close()
        body, err := ioutil.ReadAll(resp.Body)
        json.Unmarshal([]byte(body), &token)
        return token.Access_token
}

func traceOpa(input string) {
        ctx := context.TODO()

        test := rego.New(
                rego.Query("x = data.authz.allow"),
                rego.Trace(true),
                rego.Module("example.rego", opaPolicy),
                rego.Input(input),
        )

        test.Eval(ctx)
        rego.PrintTraceWithLocation(os.Stdout, test)
}

func evaluateOpa(input string) {
        ctx := context.TODO()

        query, err := rego.New(
                rego.Query("x = data.authz.allow"),
                rego.Module("example.rego", opaPolicy),
        ).PrepareForEval(ctx)
        if err != nil {
                // Handle error.
                fmt.Println(err.Error())
        }

        results, err := query.Eval(ctx, rego.EvalInput(input))
        // Inspect results.
        if err != nil {
                // Handle evaluation error.
                fmt.Println("Error: " + err.Error())
        } else if len(results) == 0 {
                // Handle undefined result.
                fmt.Println("Results are empty")
        } else {
                // Handle result/decision.
                fmt.Printf("Results = %+v\n", results) //=> [{Expressions:[true] Bindings:map[x:true]}]
        }
  }

func main() {
        tokenStr := getToken()
        traceOpa(tokenStr)
        evaluateOpa(tokenStr)

}                                                                                                    

...

Code Block
languagetext
titlepolicy.rego
package policies.rappopaprovider.policy

import input.attributes.request.http as http_request
import future.keywords.in

realm_name := "opa"
realm_url := sprintf("http://keycloak:8080/auth/realms/%v", [realm_name])
certs_url := sprintf("%v/protocol/openid-connect/certs", [realm_url])


jwks := jwks_request(certs_url).body
filtered_jwks := [ key |
      some key in jwks.keys
      key.use == "sig"
    ]
token_cert := json.marshal({"keys": filtered_jwks})

token = { "isValid": isValid, "header": header, "payload": payload } {
        [_, encoded] := split(http_request.headers.authorization, " ")
        [isValid, header, payload] := io.jwt.decode_verify(encoded, { "cert": token_cert, "aud": "account", "iss": realm_url})
 }

deny[msg] {
    not is_token_valid
    msg = "denied by rappopaprovider.policy: not a valid token"
}

is_token_valid {
  token.isValid
  now := time.now_ns() / 1000000000
  token.payload.iat <= now
  now < token.payload.exp
  token.payload.clientRole == "[opa-client-role]"
}

jwks_request(url) = http.send({
      "url": url,
      "method": "GET",
      "force_cache": true,
      "force_json_decode": true,
      "force_cache_duration_seconds": 3600 # Cache response for an hour
})

...

Code Block
languagetext
titleopa_test.sh

#!/bin/bash
INGRESS_HOST=$(minikube ip)
INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
TESTS=0
PASSED=0
FAILED=0
TEST_TS=$(date +%F-%T)
TOKEN=""
ACCESS_TOKEN=""
REFRESH_TOKEN=""

function get_token
{
    local prefix="${1}"
    url="http://${KEYCLOAK_HOST}:${KEYCLOAK_PORT}/auth/realms"
         TOKEN=$(curl -s -X POST $url/opa/protocol/openid-connect/token -H \
                 "Content-Type: application/x-www-form-urlencoded" -d client_secret=63wkv0RUXkp01pbqtNTSwghhTxeMW55I \
                 -d 'grant_type=client_credentials' -d client_id=opacli)
        ACCESS_TOKEN=$(echo $TOKEN | jq -r '.access_token')
}

function run_test
{
    local prefix="${1}" type=${2} msg="${3}" data=${4}
    TESTS=$((TESTS+1))
    echo "Test ${TESTS}: Testing $type /${prefix}"
    get_token $prefix
    url=$INGRESS_HOST:$INGRESS_PORT"/"$prefix
    result=$(curl -s -X ${type} -H "Content-type: application/json" -H "Authorization: Bearer $ACCESS_TOKEN" $url)
    echo $result
    if [ "$result" != "$msg" ]; then
            echo "FAIL"
            FAILED=$((FAILED+1))
    else
            echo "PASS"
            PASSED=$((PASSED+1))
    fi
    echo ""
}


run_test "rapp-opa-provider" "GET" "Hello OPA World!" ""

echo
echo "-----------------------------------------------------------------------"
echo "Number of Tests: $TESTS, Tests Passed: $PASSED, Tests Failed: $FAILED"
echo "Date: $TEST_TS"
echo "-----------------------------------------------------------------------"

...

Code Block
languagetext
titleApplication rules
package policy.services.rappopaprovider.ingress

import data.policy.common.request

allow = true {
request.token.isValid
request.method == "GET"
request.path == [ "rapp-opa-provider" ]
now := time.now_ns() / 1000000000
request.iat <= now
now < request.exp
request.clientRole == "[opa-client-role]"
}


Lastly create the parent rules file that will call the appropiates policy based on the http request path

...

opa bench --data rbactest.rego 'data.rbactest.allow'
+-------------------------------------------------+------------+
| samples | 22605 |
| ns/op | 47760 |
| B/op | 6269 |
| allocs/op | 112 |
| histogram_timer_rego_external_resolve_ns_75% | 400 |
| histogram_timer_rego_external_resolve_ns_90% | 500 |
| histogram_timer_rego_external_resolve_ns_95% | 500 |
| histogram_timer_rego_external_resolve_ns_99% | 871 |
| histogram_timer_rego_external_resolve_ns_99.9% | 29394 |
| histogram_timer_rego_external_resolve_ns_99.99% | 29800 |
| histogram_timer_rego_external_resolve_ns_count | 22605 |
| histogram_timer_rego_external_resolve_ns_max | 29800 |
| histogram_timer_rego_external_resolve_ns_mean | 434 |
| histogram_timer_rego_external_resolve_ns_median | 400 |
| histogram_timer_rego_external_resolve_ns_min | 200 |
| histogram_timer_rego_external_resolve_ns_stddev | 1045 |
| histogram_timer_rego_query_eval_ns_75% | 31100 |
| histogram_timer_rego_query_eval_ns_90% | 37210 |
| histogram_timer_rego_query_eval_ns_95% | 47160 |
| histogram_timer_rego_query_eval_ns_99% | 91606 |
| histogram_timer_rego_query_eval_ns_99.9% | 630561 |
| histogram_timer_rego_query_eval_ns_99.99% | 631300 |
| histogram_timer_rego_query_eval_ns_count | 22605 |
| histogram_timer_rego_query_eval_ns_max | 631300 |
| histogram_timer_rego_query_eval_ns_mean | 29182 |
| histogram_timer_rego_query_eval_ns_median | 25300 |
| histogram_timer_rego_query_eval_ns_min | 15200 |
| histogram_timer_rego_query_eval_ns_stddev | 32411 |
+-------------------------------------------------+------------+

OPA & Minio

OPA & MinIO's Access Management Plugin

OPA Sidecar injection

First create a namespace for your apps and enable istio and opa 

...

Code Block
languagetext
titleVerifyc Certificate
package main

import (
        "bytes"
        "context"
        "crypto"
        "crypto/x509"
        "crypto/x509/pkix"
        "encoding/json"
        "encoding/pem"
        "flag"
        "fmt"
        "golang.org/x/crypto/ocsp"
        "io/ioutil"
        "log"
        "net/http"
        "net/url"
        "os"
        "time"
)

type Cfinfo struct {
        Success string `json:"success,omitempty"`
        Result  struct {
                Certificate string   `json:"certificate"`
                Usages      []string `json:"usages"`
                Expiry      string   `json:"expiry"`
        }
        Errors   []string `json:"errors"`
        Messages []string `json:"messages"`
}

var cfinfo Cfinfo

type Crlinfo struct {
        Success  bool     `json:"success,omitempty"`
        Result   string   `json:"result,omitempty"`
        Errors   []string `json:"errors"`
        Messages []string `json:"messages"`
}

var crlinfo Crlinfo

type ServerParameters struct {
        certFile  string // path to the x509 cert
        issuerUrl string // webhook server port
        crlUrl    string // webhook server port
}

var parameters ServerParameters

func main() {
        flag.StringVar(¶meters.certFile, "certFile", "", "File containing the x509 certificate")
        flag.StringVar(¶meters.issuerUrl, "issuerUrl", "http://127.0.0.1:8888/api/v1/cfssl/info", "Url for retrieving the issuer certificate")
        flag.StringVar(¶meters.crlUrl, "crlUrl", "http://127.0.0.1:8888/api/v1/cfssl/crl", "Url for retrieving the crl")
        flag.Parse()
        if parameters.certFile == "" {
                flag.Usage()
                os.Exit(1)
        }
        // read x509 certificate from PEM encoded file
        cert_bytes := readFile(parameters.certFile)
        cert, err := decodeCert(cert_bytes)
        if err != nil {
                log.Fatal(err)
        }

        issuer_bytes, err := getIssuer(parameters.issuerUrl) //readCert(os.Args[2])
        if err != nil {
                log.Fatal(err)
        }

        issuer, err := decodeCert(issuer_bytes)
        if err != nil {
                log.Fatal(err)
        }

        // Perform OCSP Check
        fmt.Println("Checing OCSP")
        status, err := checkOCSPStatus(cert, issuer)
        if err != nil {
                fmt.Println(err)
        } else {
                switch status {
                case ocsp.Good:
                        fmt.Printf("[+] Certificate status is Good\n")
                case ocsp.Revoked:
                        fmt.Printf("[-] Certificate status is Revoked\n")
                case ocsp.Unknown:
                        fmt.Printf("[-] Certificate status is Unknown\n")
                }

        crl_bytes, err := getCrl(parameters.crlUrl) //readCert(os.Args[2])
        if err != nil {
                log.Fatal(err)
        }

        crl, err := decodeCrl(crl_bytes)
        if err != nil {
                log.Fatal(err)
        }

        fmt.Println("\nChecing CRL")
        _, err = checkCRLStatus(cert, issuer, crl)
        if err != nil {
                log.Fatal(err)
        }else{
                fmt.Println("[+] Certificate status is not Revoked\n")
        }
}
func checkCRLStatus(cert *x509.Certificate, issuer *x509.Certificate, crl *pkix.CertificateList) (bool, error) {
        var revoked = false
        // Check CRL signature
        err := issuer.CheckCRLSignature(crl)
        if err != nil {
                return revoked, err
        }

        // Check CRL validity
        if crl.TBSCertList.NextUpdate.Before(time.Now()) {
                return revoked, fmt.Errorf("CRL is outdated")
        }
         
        // Searching for our certificate in CRL
        for _, revokedCertificate := range crl.TBSCertList.RevokedCertificates {
                if revokedCertificate.SerialNumber.Cmp(cert.SerialNumber) == 0 {
                        //Found validated certificate in list of revoked ones
                        revoked = true
                        return revoked, fmt.Errorf("[-] Certificate status is Revoked\n")
                }
        }

        return revoked, nil
}

// CheckOCSPStatus will make an OCSP request for the provided certificate.
// If the status of the certificate is not good, then an error is returned.
func checkOCSPStatus(cert *x509.Certificate, issuer *x509.Certificate) (int, error) {
        var (
                ctx     = context.Background()
                ocspURL = cert.OCSPServer[0]
        )

        // Build OCSP request
        buffer, err := ocsp.CreateRequest(cert, issuer, &ocsp.RequestOptions{
                Hash: crypto.SHA256,
        })
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("creating ocsp request body: %w", err)
        }

        req, err := http.NewRequest(http.MethodPost, ocspURL, bytes.NewBuffer(buffer))
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("creating http request: %w", err)
        }

        ocspUrl, err := url.Parse(ocspURL)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("parsing ocsp url: %w", err)
        }

        req.Header.Add("Content-Type", "application/ocsp-request")
        req.Header.Add("Accept", "application/ocsp-response")
        req.Header.Add("host", ocspUrl.Host)
        req = req.WithContext(ctx)

        // Make OCSP request
        httpResponse, err := http.DefaultClient.Do(req)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("making ocsp request: %w", err)
        }

        defer httpResponse.Body.Close()

        output, err := ioutil.ReadAll(httpResponse.Body)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("reading response body: %w", err)
        }

        // Parse response
        ocspResponse, err := ocsp.ParseResponse(output, issuer)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("parsing ocsp response: %w", err)
        }
        return ocspResponse.Status, nil
}

func readFile(file string) []byte {
        cert, err := ioutil.ReadFile(file)
        if err != nil {
                log.Fatalln(err)
        }
        return cert
}

func decodeCert(cert_bytes []byte) (*x509.Certificate, error) {
        b, _ := pem.Decode(cert_bytes)
        var cert *x509.Certificate

        cert, err := x509.ParseCertificate(b.Bytes)
        if err != nil {
                fmt.Println("Parse Error")
                return nil, fmt.Errorf("parsing certificate: %w", err)
        }

        return cert, err
}
func decodeCrl(cert_bytes []byte) (*pkix.CertificateList, error) {
        b, _ := pem.Decode(cert_bytes)

        crl, err := x509.ParseCRL(b.Bytes)
        if err != nil {
                return nil, fmt.Errorf("parsing crl: %w", err)
        }
        return crl, err
}

func getIssuer(issuerUrl string) ([]byte, error) {
        var resp = &http.Response{}
        values := map[string]string{"label": "intermediate"}
        jsonValue, _ := json.Marshal(values)
        resp, err := http.Post(issuerUrl, "application/json", bytes.NewBuffer(jsonValue))
        if err != nil {
                fmt.Println(err)
                panic("Something wrong with the post request")
        }

        defer resp.Body.Close()
        body, err := ioutil.ReadAll(resp.Body)
        json.Unmarshal([]byte(body), &cfinfo)
        return []byte(cfinfo.Result.Certificate), nil
}

func getCrl(crlUrl string) ([]byte, error) {
        resp, err := http.Get(crlUrl)
        if err != nil {
                return nil, err
        }
        defer resp.Body.Close()

        if resp.StatusCode >= 300 {
                return nil, fmt.Errorf("failed to retrieve CRL")
        }

        body, err := ioutil.ReadAll(resp.Body)
        json.Unmarshal([]byte(body), &crlinfo)
        crlString := "-----BEGIN X509 CRL-----\n" + crlinfo.Result + "\n-----END X509 CRL-----"

        return []byte(crlString), err
}                                                            
                                                                                                                                                                                                                                                                                                                                             


Kafka

Strimzi

You can install strimzi kafka using the quick start guide: Strimzi Quickstarts

You'll need to overwrite the kafka cluster with your own definition: my-cluster.yaml

This file includes an authorization section:

Code Block
languagetext
titleopa
      authorization:
        type: opa
        url: http://opa.default:8181/v1/data/policy/kafka/authz/allow
        allowOnError: false
        initialCacheCapacity: 1000
        maximumCacheSize: 10000
        expireAfterMs: 10 #60000
        superUsers:
          - CN=henri
          - anwar
          - CN=wesley
          - CN=my-user

This file includes an listeners section:

Code Block
languagetext
titlelisteners
      listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
      - name: plain3
        port: 9097
        tls: false
        type: internal
        authentication:
          type: oauth
          checkIssuer: false
          checkAccessTokenType: true
          accessTokenIsJwt: true
          enableOauthBearer: true
          introspectionEndpointUri: http://keycloak.default:8080/auth/realms/opa/protocol/openid-connect/token/introspect
          clientId: opacli
          clientSecret:
            secretName: my-cluster-oauth
            key: clientSecret
          validIssuerUri: http://keycloak.default:8080/auth/realms/opa
          clientAudience: account
          customClaimCheck: "@.resource_access['opacli'].roles[0] == 'opa-client-role'"

Where opa and keycloak are configured.

If you are publishing or subscribing using this listener you will need to provide a JWT with your request.

The token will be verified using keycloak along with the audience field and the resource access['opaclii'].roles field.

Kafka will then pass the authorizaton on to opa where other checks will take place.

Below is a sample opa policy you might use to check the kafka input:

Code Block
languagetext
titleopa rules
package policy.kafka.authz 

default allow = false

allow = true {
    allowedActions :=  ["DESCRIBE", "DESCRIBE_CONFIGS", "READ"]
    input.action.operation == allowedActions[_]
    input.requestContext.header.name.clientId == "kowl" 
}

allow = true {
    allowedActions :=  ["DESCRIBE", "DESCRIBE_CONFIGS"]
    allowedTopics :=  ["__consumer_offsets","__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog",]
    input.action.resourcePattern.name == allowedTopics[_]
}

allow = true {
    allowedActions :=  ["DESCRIBE", "READ"]
    input.action.operation == allowedActions[_]
    allowedResourceTypes := ["GROUP"]
    input.action.resourcePattern.resourceType == allowedResourceTypes[_]
    input.requestContext.principal.name == "service-account-opacli"  
}

allow = true {
    input.action.operation == "CREATE"
    input.action.resourcePattern.name == "kafka-cluster"
    input.action.resourcePattern.resourceType == "CLUSTER"   
    input.requestContext.principal.name == "service-account-opacli"
    input.requestContext.header.name.clientId == "consumer-client"
}


allow = true {
    allowedActions :=  ["DESCRIBE","READ"]
    input.action.operation == allowedActions[_]
    input.action.resourcePattern.name == "my-topic"
    input.action.resourcePattern.resourceType == "TOPIC"   
    input.requestContext.principal.name == "service-account-opacli"
    input.requestContext.header.name.clientId == "consumer-client"  
}


allow = true {
    allowedActions :=  ["DESCRIBE","WRITE"]
    input.action.operation == allowedActions[_]
    input.action.resourcePattern.name == "my-topic"
    input.action.resourcePattern.resourceType == "TOPIC"   
    input.requestContext.principal.name == "service-account-opacli"
    input.requestContext.header.name.clientId == "producer-client"
}

The default value of the sub field (principal name) in keycloak is the user id, this can be changed to the user name using a property mapper:

kube-mgmt

By default the entire policy and data cache are defined by the opa bundle.

If you need to add data from other sources you need to define the bundle root directories in the .manifest file

e.g.

Code Block
languagejs
titlemanifest
{
  "revision" : "1",
  "roots": ["policy", "roles"]
}

You can then add the kube-mgmt sidecar to your opa deployment and this will pull data from configmaps in the namespace speciiied

e.g.

Code Block
languagetext
        - name: kube-mgmt
          image: openpolicyagent/kube-mgmt:latest
          args:
            - "--namespaces=opa"


Code Block
languageyml
titlehello-data.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: hello-data
  namespace: opa
  labels:
    openpolicyagent.org/data: opa
data:
  x.json: |
    {"a": [1,2,3,4]}


Code Block
languagejs
titletoken.json
    {
      "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
    }

You can also add extra data using curl e.g. curl -X PUT $host:31182/v1/data/jwt -d @token.json

You can view all the data available to opa using curl: curl <opa host>:<port>/v1/data

You should see something similar to the following output:

Code Block
languagejs
titleopa data
{
   "decision_id":"067f7d2d-0e5d-4ca8-8cb0-6d38543c6b2f",
   "result":{
      "jwt":{      "access_token":"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
      },
      "opa":{
         "hello-data":{
            "x.json":{
               "a":[
                  1,
                  2,
                  3,
                  4
               ]
            }
         }
      },
      "servers":{
         "id":"s1",
         "name":"app",
         "ports":[
            "p1",
            "p2",
            "p3"
         ],
         "protocols":[
            "https",
            "ssh"
         ]
      }
   }
}

You can get individual values like this: curl <opa host>:<port>/v1/data/jwt/access_token or refer to them in your code like this: data.opa["hello-data"]["x.json"].a[0]

Strimzi Bridge

You can communicate with kafka over http by adding strimzi bridge:

Code Block
languageyml
titleStrimzi Bridge
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: strimzi-bridge
  namespace: kafka
spec:
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  http:
    port: 8080



See also:

Using Open Policy Agent with Strimzi and Apache Kafka

Using Strimzi

kube-mgmt

Styra opa-kafka-plugin

Secure Kafka with Keycloak: SASL OAuth Bearer

Enforce Custom Resource policies with Open Policy Agent Gatekeeper

How to integrate Kafka with Istio

Example with JWT payload passed from Envoy

Kafka Broker filter

Kafka mesh filter in Envoy

Kafka authentication using OAuth 2.0

A deep dive into Strimzi

Using HTTP Bridge as a Kubernetes sidecar

Kafka SSL : Setup with self signed certificate

Kafka & Istio

Integration of Istio with Kafka is limited.

The Kafka Protocol is a TCP level protocol (Layer 4)

Kafka is a binary wrapper over TCP protocol.

Istio mainly supports HTTP/HTTPS on layer 7. 

Higher level protocols such as HTTP have access to metadata so they can properly route requests, this is not the case with the Kafka protocol.

Service Mesh and Proxies: Examples for Kafka


Setting up Strimzi bridge with Istio Authorization and ACLs

Image Added



The Kiali screenshot above represents both  a producer and a consumer connecting to Kafka through the Strmzi bridge.

Access to the bridge is controlled using JWT which is checked using an Istio AuthorizationPolicy.

Code Block
languageyml
titleAuthorizationPolicy
apiVersion: "security.istio.io/v1beta1"
kind: "AuthorizationPolicy"
metadata:
  name: "kafka-bridge-policy"
  namespace: kafka
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: kafka-bridge
  action: ALLOW
  rules:
  - from:
    - source:
        requestPrincipals: ["http://keycloak.default:8080/auth/realms/opa", "http://istio-ingressgateway.istio-system:80/auth/realms/opa"]
  - to:
    - operation:
        methods: ["POST", "GET"]
        paths: ["/topics", "/topics/*", "/consumers/*"]
    when:
    - key: request.auth.claims[clientRole]
      values: ["opa-client-role"]


By enabling tls on one of our bootstrap listeners user certificates are automatically created when we create a new KafkaUser

      - name: tlsauth
        type: internal
        port: 9096
        tls: true
        authentication:
          type: tls

Setting the authentication type to tls also ensure secure communication between Kafka and zookeeper.

We also set authorization to simple for the Kafka cluster, this means ACLs will be used.

     kafka:
      authorization:
        type: simple
        superUsers:
          - CN=kowl

Any users you want to by-pass ACL checks should be listed under superUsers.

We can then setup our bridge user and include the ACL permissions as part of the KafkaUser definition:

Code Block
languageyml
titleKafkaUser
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: bridge
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      # access to the topic
      - resource:
          type: topic
          name: my-topic
        operations:
          - Create
          - Describe
          - Read
          - Write
          - AlterConfigs
        host: "*"
      # access to the group
      - resource:
          type: group
          name: my-group
        operations:
          - Describe
          - Read
        host: "*"
      # access to the cluster
      - resource:
          type: cluster
        operations:
          - Alter
          - AlterConfigs
        host: "*"


Authorization between the bridge and Kafka is done using user certificates.


Code Block
languageyml
titleKafkaBridge
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: strimzi-bridge
  namespace: kafka
spec:
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9096
  http:
    port: 8080
  logging:
    type: inline
    loggers:
      logger.bridge.level: "DEBUG"
      logger.send.name: "http.openapi.operation.send"
      logger.send.level: "DEBUG"
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      secretName: bridge
      certificate: user.crt
      key: user.key


Records can then to be posted to a topic with a call like this:

Endpoint:

http://istio-ingressgateway.istio-system:80/topics/my-topic

Payload: {"records":[{"key":"key-366","value":"value-366"}]}

and read from a topic using the following syntax:

i) Create a consumer group

Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group

Payload: {"name":"my-consumer","format":"json","auto.offset.reset":"earliest","enable.auto.commit":false}

ii) subscribe to a topic

Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/subscription

Payload: {"topics":["my-topic"]}

iii) Read records from a topic

Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/records


These calls will be redirected to the bridge using a gateway and a virtual service

Code Block
languageyml
titleGateway/VirtualService
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
  name: strimzi-bridge-gateway
  namespace: kafka
spec:
  selector:
    istio: ingressgateway # use Istio gateway implementation
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*"
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: strimzi-bridge-vs
  namespace: kafka
spec:
  hosts:
  - "*"
  gateways:
  - strimzi-bridge-gateway
  http:
  - name: "strimzi-bridge-routes"
    match:
    - uri:
        prefix: "/topics"
    - uri:
        prefix: "/consumers"
    route:
    - destination:
        port:
          number: 8080
        host: strimzi-bridge-bridge-service.kafka.svc.cluster.local


Note: To use Kowl with this setup you'll need to create a kowl user and configure kowl to use tls

Code Block
languageyml
titleKowl
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: kowl
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls


Code Block
languageyml
titleKowl configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: kowl-config-cm
  namespace: kafka
data:
  config.yaml: |
    kafka:
      brokers:
        - my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9093
      tls:
        enabled: true
        caFilepath: /etc/strimzi/ca/ca.crt
        certFilepath: /etc/strimzi/user-crt/crt/user.crt
        keyFilepath: /etc/strimzi/user-key/key/user.key


  • ca.crt is obtained from the my-cluster-cluster-ca-cert secret, user.crt and user.key are obtained from the kowl secret.


Admission Controllers

An admission controller is a piece of code that intercepts requests to the Kubernetes API server prior to persistence of the object, but after the request is authenticated and authorized.

Admission Controllers Reference

Kyverno

https://kyverno.io/

Opa Gatekeeper

Open-policy-agent Gatekeeper

OPA Gatekeeper: Policy and Governance for Kubernetes