Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Godaddy Opa Lambda Extension Plugin

Extending OPA

OPA Ecosystem

GO

Go provides a library for opa.

...

Code Block
languagetext
titleopa_test.sh

#!/bin/bash
INGRESS_HOST=$(minikube ip)
INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
TESTS=0
PASSED=0
FAILED=0
TEST_TS=$(date +%F-%T)
TOKEN=""
ACCESS_TOKEN=""
REFRESH_TOKEN=""

function get_token
{
    local prefix="${1}"
    url="http://${KEYCLOAK_HOST}:${KEYCLOAK_PORT}/auth/realms"
         TOKEN=$(curl -s -X POST $url/opa/protocol/openid-connect/token -H \
                 "Content-Type: application/x-www-form-urlencoded" -d client_secret=63wkv0RUXkp01pbqtNTSwghhTxeMW55I \
                 -d 'grant_type=client_credentials' -d client_id=opacli)
        ACCESS_TOKEN=$(echo $TOKEN | jq -r '.access_token')
}

function run_test
{
    local prefix="${1}" type=${2} msg="${3}" data=${4}
    TESTS=$((TESTS+1))
    echo "Test ${TESTS}: Testing $type /${prefix}"
    get_token $prefix
    url=$INGRESS_HOST:$INGRESS_PORT"/"$prefix
    result=$(curl -s -X ${type} -H "Content-type: application/json" -H "Authorization: Bearer $ACCESS_TOKEN" $url)
    echo $result
    if [ "$result" != "$msg" ]; then
            echo "FAIL"
            FAILED=$((FAILED+1))
    else
            echo "PASS"
            PASSED=$((PASSED+1))
    fi
    echo ""
}


run_test "rapp-opa-provider" "GET" "Hello OPA World!" ""

echo
echo "-----------------------------------------------------------------------"
echo "Number of Tests: $TESTS, Tests Passed: $PASSED, Tests Failed: $FAILED"
echo "Date: $TEST_TS"
echo "-----------------------------------------------------------------------"

...

opa bench --data rbactest.rego 'data.rbactest.allow'
+-------------------------------------------------+------------+
| samples | 22605 |
| ns/op | 47760 |
| B/op | 6269 |
| allocs/op | 112 |
| histogram_timer_rego_external_resolve_ns_75% | 400 |
| histogram_timer_rego_external_resolve_ns_90% | 500 |
| histogram_timer_rego_external_resolve_ns_95% | 500 |
| histogram_timer_rego_external_resolve_ns_99% | 871 |
| histogram_timer_rego_external_resolve_ns_99.9% | 29394 |
| histogram_timer_rego_external_resolve_ns_99.99% | 29800 |
| histogram_timer_rego_external_resolve_ns_count | 22605 |
| histogram_timer_rego_external_resolve_ns_max | 29800 |
| histogram_timer_rego_external_resolve_ns_mean | 434 |
| histogram_timer_rego_external_resolve_ns_median | 400 |
| histogram_timer_rego_external_resolve_ns_min | 200 |
| histogram_timer_rego_external_resolve_ns_stddev | 1045 |
| histogram_timer_rego_query_eval_ns_75% | 31100 |
| histogram_timer_rego_query_eval_ns_90% | 37210 |
| histogram_timer_rego_query_eval_ns_95% | 47160 |
| histogram_timer_rego_query_eval_ns_99% | 91606 |
| histogram_timer_rego_query_eval_ns_99.9% | 630561 |
| histogram_timer_rego_query_eval_ns_99.99% | 631300 |
| histogram_timer_rego_query_eval_ns_count | 22605 |
| histogram_timer_rego_query_eval_ns_max | 631300 |
| histogram_timer_rego_query_eval_ns_mean | 29182 |
| histogram_timer_rego_query_eval_ns_median | 25300 |
| histogram_timer_rego_query_eval_ns_min | 15200 |
| histogram_timer_rego_query_eval_ns_stddev | 32411 |
+-------------------------------------------------+------------+

OPA & Minio

OPA & MinIO's Access Management Plugin

OPA Sidecar injection

First create a namespace for your apps and enable istio and opa 

...

Code Block
languagetext
titleVerifyc Certificate
package main

import (
        "bytes"
        "context"
        "crypto"
        "crypto/x509"
        "crypto/x509/pkix"
        "encoding/json"
        "encoding/pem"
        "flag"
        "fmt"
        "golang.org/x/crypto/ocsp"
        "io/ioutil"
        "log"
        "net/http"
        "net/url"
        "os"
        "time"
)

type Cfinfo struct {
        Success string `json:"success,omitempty"`
        Result  struct {
                Certificate string   `json:"certificate"`
                Usages      []string `json:"usages"`
                Expiry      string   `json:"expiry"`
        }
        Errors   []string `json:"errors"`
        Messages []string `json:"messages"`
}

var cfinfo Cfinfo

type Crlinfo struct {
        Success  bool     `json:"success,omitempty"`
        Result   string   `json:"result,omitempty"`
        Errors   []string `json:"errors"`
        Messages []string `json:"messages"`
}

var crlinfo Crlinfo

type ServerParameters struct {
        certFile  string // path to the x509 cert
        issuerUrl string // webhook server port
        crlUrl    string // webhook server port
}

var parameters ServerParameters

func main() {
        flag.StringVar(¶meters.certFile, "certFile", "", "File containing the x509 certificate")
        flag.StringVar(¶meters.issuerUrl, "issuerUrl", "http://127.0.0.1:8888/api/v1/cfssl/info", "Url for retrieving the issuer certificate")
        flag.StringVar(¶meters.crlUrl, "crlUrl", "http://127.0.0.1:8888/api/v1/cfssl/crl", "Url for retrieving the crl")
        flag.Parse()
        if parameters.certFile == "" {
                flag.Usage()
                os.Exit(1)
        }
        // read x509 certificate from PEM encoded file
        cert_bytes := readFile(parameters.certFile)
        cert, err := decodeCert(cert_bytes)
        if err != nil {
                log.Fatal(err)
        }

        issuer_bytes, err := getIssuer(parameters.issuerUrl) //readCert(os.Args[2])
        if err != nil {
                log.Fatal(err)
        }

        issuer, err := decodeCert(issuer_bytes)
        if err != nil {
                log.Fatal(err)
        }

        // Perform OCSP Check
        fmt.Println("Checing OCSP")
        status, err := checkOCSPStatus(cert, issuer)
        if err != nil {
                fmt.Println(err)
        } else {
                switch status {
                case ocsp.Good:
                        fmt.Printf("[+] Certificate status is Good\n")
                case ocsp.Revoked:
                        fmt.Printf("[-] Certificate status is Revoked\n")
                case ocsp.Unknown:
                        fmt.Printf("[-] Certificate status is Unknown\n")
                }

        crl_bytes, err := getCrl(parameters.crlUrl) //readCert(os.Args[2])
        if err != nil {
                log.Fatal(err)
        }

        crl, err := decodeCrl(crl_bytes)
        if err != nil {
                log.Fatal(err)
        }

        fmt.Println("\nChecing CRL")
        _, err = checkCRLStatus(cert, issuer, crl)
        if err != nil {
                log.Fatal(err)
        }else{
                fmt.Println("[+] Certificate status is not Revoked\n")
        }
}
func checkCRLStatus(cert *x509.Certificate, issuer *x509.Certificate, crl *pkix.CertificateList) (bool, error) {
        var revoked = false
        // Check CRL signature
        err := issuer.CheckCRLSignature(crl)
        if err != nil {
                return revoked, err
        }

        // Check CRL validity
        if crl.TBSCertList.NextUpdate.Before(time.Now()) {
                return revoked, fmt.Errorf("CRL is outdated")
        }
         
        // Searching for our certificate in CRL
        for _, revokedCertificate := range crl.TBSCertList.RevokedCertificates {
                if revokedCertificate.SerialNumber.Cmp(cert.SerialNumber) == 0 {
                        //Found validated certificate in list of revoked ones
                        revoked = true
                        return revoked, fmt.Errorf("[-] Certificate status is Revoked\n")
                }
        }

        return revoked, nil
}

// CheckOCSPStatus will make an OCSP request for the provided certificate.
// If the status of the certificate is not good, then an error is returned.
func checkOCSPStatus(cert *x509.Certificate, issuer *x509.Certificate) (int, error) {
        var (
                ctx     = context.Background()
                ocspURL = cert.OCSPServer[0]
        )

        // Build OCSP request
        buffer, err := ocsp.CreateRequest(cert, issuer, &ocsp.RequestOptions{
                Hash: crypto.SHA256,
        })
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("creating ocsp request body: %w", err)
        }

        req, err := http.NewRequest(http.MethodPost, ocspURL, bytes.NewBuffer(buffer))
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("creating http request: %w", err)
        }

        ocspUrl, err := url.Parse(ocspURL)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("parsing ocsp url: %w", err)
        }

        req.Header.Add("Content-Type", "application/ocsp-request")
        req.Header.Add("Accept", "application/ocsp-response")
        req.Header.Add("host", ocspUrl.Host)
        req = req.WithContext(ctx)

        // Make OCSP request
        httpResponse, err := http.DefaultClient.Do(req)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("making ocsp request: %w", err)
        }

        defer httpResponse.Body.Close()

        output, err := ioutil.ReadAll(httpResponse.Body)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("reading response body: %w", err)
        }

        // Parse response
        ocspResponse, err := ocsp.ParseResponse(output, issuer)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("parsing ocsp response: %w", err)
        }
        return ocspResponse.Status, nil
}

func readFile(file string) []byte {
        cert, err := ioutil.ReadFile(file)
        if err != nil {
                log.Fatalln(err)
        }
        return cert
}

func decodeCert(cert_bytes []byte) (*x509.Certificate, error) {
        b, _ := pem.Decode(cert_bytes)
        var cert *x509.Certificate

        cert, err := x509.ParseCertificate(b.Bytes)
        if err != nil {
                fmt.Println("Parse Error")
                return nil, fmt.Errorf("parsing certificate: %w", err)
        }

        return cert, err
}
func decodeCrl(cert_bytes []byte) (*pkix.CertificateList, error) {
        b, _ := pem.Decode(cert_bytes)

        crl, err := x509.ParseCRL(b.Bytes)
        if err != nil {
                return nil, fmt.Errorf("parsing crl: %w", err)
        }
        return crl, err
}

func getIssuer(issuerUrl string) ([]byte, error) {
        var resp = &http.Response{}
        values := map[string]string{"label": "intermediate"}
        jsonValue, _ := json.Marshal(values)
        resp, err := http.Post(issuerUrl, "application/json", bytes.NewBuffer(jsonValue))
        if err != nil {
                fmt.Println(err)
                panic("Something wrong with the post request")
        }

        defer resp.Body.Close()
        body, err := ioutil.ReadAll(resp.Body)
        json.Unmarshal([]byte(body), &cfinfo)
        return []byte(cfinfo.Result.Certificate), nil
}

func getCrl(crlUrl string) ([]byte, error) {
        resp, err := http.Get(crlUrl)
        if err != nil {
                return nil, err
        }
        defer resp.Body.Close()

        if resp.StatusCode >= 300 {
                return nil, fmt.Errorf("failed to retrieve CRL")
        }

        body, err := ioutil.ReadAll(resp.Body)
        json.Unmarshal([]byte(body), &crlinfo)
        crlString := "-----BEGIN X509 CRL-----\n" + crlinfo.Result + "\n-----END X509 CRL-----"

        return []byte(crlString), err
}                                                            
                                                                                                                                                                                                                                                                                                                                             


Kafka

Strimzi

You can install strimzi kafka using the quick start guide: Strimzi Quickstarts

...

Code Block
languagetext
titleopa rules
package policy.kafka.authz 

default allow = false

allow = true {
    allowedActions :=  ["DESCRIBE", "CREATEDESCRIBE_CONFIGS", "READ"]
    input.action.operation == allowedActions[_]
    input.requestContext.header.name.clientId == "kowl" 
}

allow = true {
    allowedResourceTypesallowedActions :=  ["GROUPDESCRIBE", "CLUSTERDESCRIBE_CONFIGS"]
    input.action.resourcePattern.resourceType == allowedResourceTypes[_allowedTopics :=  ["__consumer_offsets","__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog",]
    input.requestContextaction.principalresourcePattern.name == "service-account-opacli"  allowedTopics[_]
}

allow = true {
    allowedActions :=  ["DESCRIBE", "READ"]
    input.action.operation == allowedActions[_]
    allowedResourceTypes := ["GROUP"]
    input.action.resourcePattern.resourceType == allowedResourceTypes[_]
    input.requestContext.principal.name == "service-account-opacli"  
}

allow = true {
    input.action.operation == "CREATE"
    input.action.resourcePattern.name == "mykafka-topiccluster"
    input.action.resourcePattern.resourceType == "TOPICCLUSTER"   
    input.requestContext.principal.name == "service-account-opacli"
    input.requestContext.header.name.clientId == "consumer-client"
}


allow = true {
    allowedActions :=  ["DESCRIBE","WRITEREAD"]
    input.action.operation == allowedActions[_]
    input.action.resourcePattern.name == "my-topic"
    input.action.resourcePattern.resourceType == "TOPIC"   
    input.requestContext.principal.name == "service-account-opacli"
    input.requestContext.header.name.clientId == "producerconsumer-client"
}  
}


allow = true {
    allowedActions :=  ["DESCRIBE","WRITE"]
    input.action.operation == allowedActions[_]
    input.action.resourcePattern.name == "my-topic"
    input.action.resourcePattern.resourceType == "TOPIC"   
    input.requestContext.principal.name == "service-account-opacli"
    input.requestContext.header.name.clientId == "producer-client"
}

The default value of the sub field (principal name) in keycloak The default value of the sub field (principal name) in keycloak is the user id, this can be changed to the user name using a property mapper:

...

If you need to add data from other sources you need to include other data you need to define the bundle root directories in the .manifest file

...

Code Block
languagejs
titlemanifest
{
  "revision" : "1",
  "roots": ["policy" , "serversroles"]
}

You can then add the kube-mgmt sidecar to your opa deployment and this will pull data from configmaps in the namespace speciiied

...

Code Block
languageyml
titlehello-data.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: hello-data
  namespace: opa
  labels:
    openpolicyagent.org/data: opa
data:
  x.json: |
    {"a": [1,2,3,4]}


Code Block
languagejs
titletoken.json
    {
      "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
    }

You can also add extra data using curl e.g. curl -X PUT $host:31182/v1/data/jwt -d @servers2@token.json

You can view all the data available to opa using curl: curl <opa host>:<port>/v1/data

...

Code Block
languagejs
titleopa data
{
             "decision_id":"067f7d2d-0e5d-4ca8-8cb0-6d38543c6b2f",
   "Topicresult":[{
      "jwt":{       "access_token":"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
     "Write", },
      "opa":{
         "hello-data":{
            "Describex.json":{
               ]
"a":[
              }
    1,
          }
        }2,
         "services":{
         "rappopaprovider":{
3,
                  "ingress":{4
               ]
            }
         }
      },
   },
   "servers":{
         "id":"s1",
         "name":"app",
         "ports":[
            "p1",
            "p2",
            "p3"
         ],
         "protocols":[
            "https",
            "ssh"
         ]
      }
   }
}

See also:

Using Open Policy Agent with Strimzi and Apache Kafka

Using Strimzi

kube-mgmt

Styra opa-kafka-plugin

Secure Kafka with Keycloak: SASL OAuth Bearer

You can get individual values like this: curl <opa host>:<port>/v1/data/jwt/access_token or refer to them in your code like this: data.opa["hello-data"]["x.json"].a[0]

Strimzi Bridge

You can communicate with kafka over http by adding strimzi bridge:

Code Block
languageyml
titleStrimzi Bridge
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: strimzi-bridge
  namespace: kafka
spec:
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  http:
    port: 8080



See also:

Using Open Policy Agent with Strimzi and Apache Kafka

Using Strimzi

kube-mgmt

Styra opa-kafka-plugin

Secure Kafka with Keycloak: SASL OAuth Bearer

Enforce Custom Resource policies with Open Policy Agent Gatekeeper

How to integrate Kafka with Istio

Example with JWT payload passed from Envoy

Kafka Broker filter

Kafka mesh filter in Envoy

Kafka authentication using OAuth 2.0

A deep dive into Strimzi

Using HTTP Bridge as a Kubernetes sidecar

Kafka SSL : Setup with self signed certificate

Kafka & Istio

Integration of Istio with Kafka is limited.

The Kafka Protocol is a TCP level protocol (Layer 4)

Kafka is a binary wrapper over TCP protocol.

Istio mainly supports HTTP/HTTPS on layer 7. 

Higher level protocols such as HTTP have access to metadata so they can properly route requests, this is not the case with the Kafka protocol.

Service Mesh and Proxies: Examples for Kafka


Setting up Strimzi bridge with Istio Authorization and ACLs

Image Added



The Kiali screenshot above represents both  a producer and a consumer connecting to Kafka through the Strmzi bridge.

Access to the bridge is controlled using JWT which is checked using an Istio AuthorizationPolicy.

Code Block
languageyml
titleAuthorizationPolicy
apiVersion: "security.istio.io/v1beta1"
kind: "AuthorizationPolicy"
metadata:
  name: "kafka-bridge-policy"
  namespace: kafka
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: kafka-bridge
  action: ALLOW
  rules:
  - from:
    - source:
        requestPrincipals: ["http://keycloak.default:8080/auth/realms/opa", "http://istio-ingressgateway.istio-system:80/auth/realms/opa"]
  - to:
    - operation:
        methods: ["POST", "GET"]
        paths: ["/topics", "/topics/*", "/consumers/*"]
    when:
    - key: request.auth.claims[clientRole]
      values: ["opa-client-role"]


By enabling tls on one of our bootstrap listeners user certificates are automatically created when we create a new KafkaUser

      - name: tlsauth
        type: internal
        port: 9096
        tls: true
        authentication:
          type: tls

Setting the authentication type to tls also ensure secure communication between Kafka and zookeeper.

We also set authorization to simple for the Kafka cluster, this means ACLs will be used.

     kafka:
      authorization:
        type: simple
        superUsers:
          - CN=kowl

Any users you want to by-pass ACL checks should be listed under superUsers.

We can then setup our bridge user and include the ACL permissions as part of the KafkaUser definition:

Code Block
languageyml
titleKafkaUser
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: bridge
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      # access to the topic
      - resource:
          type: topic
          name: my-topic
        operations:
          - Create
          - Describe
          - Read
          - Write
          - AlterConfigs
        host: "*"
      # access to the group
      - resource:
          type: group
          name: my-group
        operations:
          - Describe
          - Read
        host: "*"
      # access to the cluster
      - resource:
          type: cluster
        operations:
          - Alter
          - AlterConfigs
        host: "*"


Authorization between the bridge and Kafka is done using user certificates.


Code Block
languageyml
titleKafkaBridge
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
  name: strimzi-bridge
  namespace: kafka
spec:
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9096
  http:
    port: 8080
  logging:
    type: inline
    loggers:
      logger.bridge.level: "DEBUG"
      logger.send.name: "http.openapi.operation.send"
      logger.send.level: "DEBUG"
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      secretName: bridge
      certificate: user.crt
      key: user.key


Records can then to be posted to a topic with a call like this:

Endpoint:

http://istio-ingressgateway.istio-system:80/topics/my-topic

Payload: {"records":[{"key":"key-366","value":"value-366"}]}

and read from a topic using the following syntax:

i) Create a consumer group

Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group

Payload: {"name":"my-consumer","format":"json","auto.offset.reset":"earliest","enable.auto.commit":false}

ii) subscribe to a topic

Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/subscription

Payload: {"topics":["my-topic"]}

iii) Read records from a topic

Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/records


These calls will be redirected to the bridge using a gateway and a virtual service

Code Block
languageyml
titleGateway/VirtualService
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
  name: strimzi-bridge-gateway
  namespace: kafka
spec:
  selector:
    istio: ingressgateway # use Istio gateway implementation
  servers:
  - port:
      number: 80
      name: http
      protocol: HTTP
    hosts:
    - "*"
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: strimzi-bridge-vs
  namespace: kafka
spec:
  hosts:
  - "*"
  gateways:
  - strimzi-bridge-gateway
  http:
  - name: "strimzi-bridge-routes"
    match:
    - uri:
        prefix: "/topics"
    - uri:
        prefix: "/consumers"
    route:
    - destination:
        port:
          number: 8080
        host: strimzi-bridge-bridge-service.kafka.svc.cluster.local


Note: To use Kowl with this setup you'll need to create a kowl user and configure kowl to use tls

Code Block
languageyml
titleKowl
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: kowl
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls


Code Block
languageyml
titleKowl configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: kowl-config-cm
  namespace: kafka
data:
  config.yaml: |
    kafka:
      brokers:
        - my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9093
      tls:
        enabled: true
        caFilepath: /etc/strimzi/ca/ca.crt
        certFilepath: /etc/strimzi/user-crt/crt/user.crt
        keyFilepath: /etc/strimzi/user-key/key/user.key


  • ca.crt is obtained from the my-cluster-cluster-ca-cert secret, user.crt and user.key are obtained from the kowl secret.


Admission Controllers

An admission controller is a piece of code that intercepts requests to the Kubernetes API server prior to persistence of the object, but after the request is authenticated and authorized.

Admission Controllers Reference

Kyverno

https://kyverno.io/

Opa Gatekeeper

Open-policy-agent Gatekeeper

OPA Gatekeeper: Policy and Governance for Kubernetes