Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


Code Block
titleApplication rules

import data.policy.common.request

allow = true {
request.method == "GET"
request.path == [ "rapp-opa-provider" ]
now := time.now_ns() / 1000000000
request.iat <= now
now < request.exp
request.clientRole == "[opa-client-role]"

Lastly create the parent rules file that will call the appropiates policy based on the http request path


Code Block
titleVerifyc Certificate
package main

import (

type Cfinfo struct {
        Success string `json:"success,omitempty"`
        Result  struct {
                Certificate string   `json:"certificate"`
                Usages      []string `json:"usages"`
                Expiry      string   `json:"expiry"`
        Errors   []string `json:"errors"`
        Messages []string `json:"messages"`

var cfinfo Cfinfo

type Crlinfo struct {
        Success  bool     `json:"success,omitempty"`
        Result   string   `json:"result,omitempty"`
        Errors   []string `json:"errors"`
        Messages []string `json:"messages"`

var crlinfo Crlinfo

type ServerParameters struct {
        certFile  string // path to the x509 cert
        issuerUrl string // webhook server port
        crlUrl    string // webhook server port

var parameters ServerParameters

func main() {
        flag.StringVar(¶meters.certFile, "certFile", "", "File containing the x509 certificate")
        flag.StringVar(¶meters.issuerUrl, "issuerUrl", "", "Url for retrieving the issuer certificate")
        flag.StringVar(¶meters.crlUrl, "crlUrl", "", "Url for retrieving the crl")
        if parameters.certFile == "" {
        // read x509 certificate from PEM encoded file
        cert_bytes := readFile(parameters.certFile)
        cert, err := decodeCert(cert_bytes)
        if err != nil {

        issuer_bytes, err := getIssuer(parameters.issuerUrl) //readCert(os.Args[2])
        if err != nil {

        issuer, err := decodeCert(issuer_bytes)
        if err != nil {

        // Perform OCSP Check
        fmt.Println("Checing OCSP")
        status, err := checkOCSPStatus(cert, issuer)
        if err != nil {
        } else {
                switch status {
                case ocsp.Good:
                        fmt.Printf("[+] Certificate status is Good\n")
                case ocsp.Revoked:
                        fmt.Printf("[-] Certificate status is Revoked\n")
                case ocsp.Unknown:
                        fmt.Printf("[-] Certificate status is Unknown\n")

        crl_bytes, err := getCrl(parameters.crlUrl) //readCert(os.Args[2])
        if err != nil {

        crl, err := decodeCrl(crl_bytes)
        if err != nil {

        fmt.Println("\nChecing CRL")
        _, err = checkCRLStatus(cert, issuer, crl)
        if err != nil {
                fmt.Println("[+] Certificate status is not Revoked\n")
func checkCRLStatus(cert *x509.Certificate, issuer *x509.Certificate, crl *pkix.CertificateList) (bool, error) {
        var revoked = false
        // Check CRL signature
        err := issuer.CheckCRLSignature(crl)
        if err != nil {
                return revoked, err

        // Check CRL validity
        if crl.TBSCertList.NextUpdate.Before(time.Now()) {
                return revoked, fmt.Errorf("CRL is outdated")
        // Searching for our certificate in CRL
        for _, revokedCertificate := range crl.TBSCertList.RevokedCertificates {
                if revokedCertificate.SerialNumber.Cmp(cert.SerialNumber) == 0 {
                        //Found validated certificate in list of revoked ones
                        revoked = true
                        return revoked, fmt.Errorf("[-] Certificate status is Revoked\n")

        return revoked, nil

// CheckOCSPStatus will make an OCSP request for the provided certificate.
// If the status of the certificate is not good, then an error is returned.
func checkOCSPStatus(cert *x509.Certificate, issuer *x509.Certificate) (int, error) {
        var (
                ctx     = context.Background()
                ocspURL = cert.OCSPServer[0]

        // Build OCSP request
        buffer, err := ocsp.CreateRequest(cert, issuer, &ocsp.RequestOptions{
                Hash: crypto.SHA256,
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("creating ocsp request body: %w", err)

        req, err := http.NewRequest(http.MethodPost, ocspURL, bytes.NewBuffer(buffer))
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("creating http request: %w", err)

        ocspUrl, err := url.Parse(ocspURL)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("parsing ocsp url: %w", err)

        req.Header.Add("Content-Type", "application/ocsp-request")
        req.Header.Add("Accept", "application/ocsp-response")
        req.Header.Add("host", ocspUrl.Host)
        req = req.WithContext(ctx)

        // Make OCSP request
        httpResponse, err := http.DefaultClient.Do(req)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("making ocsp request: %w", err)

        defer httpResponse.Body.Close()

        output, err := ioutil.ReadAll(httpResponse.Body)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("reading response body: %w", err)

        // Parse response
        ocspResponse, err := ocsp.ParseResponse(output, issuer)
        if err != nil {
                return ocsp.Unknown, fmt.Errorf("parsing ocsp response: %w", err)
        return ocspResponse.Status, nil

func readFile(file string) []byte {
        cert, err := ioutil.ReadFile(file)
        if err != nil {
        return cert

func decodeCert(cert_bytes []byte) (*x509.Certificate, error) {
        b, _ := pem.Decode(cert_bytes)
        var cert *x509.Certificate

        cert, err := x509.ParseCertificate(b.Bytes)
        if err != nil {
                fmt.Println("Parse Error")
                return nil, fmt.Errorf("parsing certificate: %w", err)

        return cert, err
func decodeCrl(cert_bytes []byte) (*pkix.CertificateList, error) {
        b, _ := pem.Decode(cert_bytes)

        crl, err := x509.ParseCRL(b.Bytes)
        if err != nil {
                return nil, fmt.Errorf("parsing crl: %w", err)
        return crl, err

func getIssuer(issuerUrl string) ([]byte, error) {
        var resp = &http.Response{}
        values := map[string]string{"label": "intermediate"}
        jsonValue, _ := json.Marshal(values)
        resp, err := http.Post(issuerUrl, "application/json", bytes.NewBuffer(jsonValue))
        if err != nil {
                panic("Something wrong with the post request")

        defer resp.Body.Close()
        body, err := ioutil.ReadAll(resp.Body)
        json.Unmarshal([]byte(body), &cfinfo)
        return []byte(cfinfo.Result.Certificate), nil

func getCrl(crlUrl string) ([]byte, error) {
        resp, err := http.Get(crlUrl)
        if err != nil {
                return nil, err
        defer resp.Body.Close()

        if resp.StatusCode >= 300 {
                return nil, fmt.Errorf("failed to retrieve CRL")

        body, err := ioutil.ReadAll(resp.Body)
        json.Unmarshal([]byte(body), &crlinfo)
        crlString := "-----BEGIN X509 CRL-----\n" + crlinfo.Result + "\n-----END X509 CRL-----"

        return []byte(crlString), err


You can install strimzi kafka using the quick start guide: Strimzi Quickstarts

You'll need to overwrite the kafka cluster with your own definition: my-cluster.yaml

This file includes an authorization section:

Code Block
        type: opa
        url: http://opa.default:8181/v1/data/policy/kafka/authz/allow
        allowOnError: false
        initialCacheCapacity: 1000
        maximumCacheSize: 10000
        expireAfterMs: 10 #60000
          - CN=henri
          - anwar
          - CN=wesley
          - CN=my-user

This file includes an listeners section:

Code Block
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
      - name: plain3
        port: 9097
        tls: false
        type: internal
          type: oauth
          checkIssuer: false
          checkAccessTokenType: true
          accessTokenIsJwt: true
          enableOauthBearer: true
          introspectionEndpointUri: http://keycloak.default:8080/auth/realms/opa/protocol/openid-connect/token/introspect
          clientId: opacli
            secretName: my-cluster-oauth
            key: clientSecret
          validIssuerUri: http://keycloak.default:8080/auth/realms/opa
          clientAudience: account
          customClaimCheck: "@.resource_access['opacli'].roles[0] == 'opa-client-role'"

Where opa and keycloak are configured.

If you are publishing or subscribing using this listener you will need to provide a JWT with your request.

The token will be verified using keycloak along with the audience field and the resource access['opaclii'].roles field.

Kafka will then pass the authorizaton on to opa where other checks will take place.

Below is a sample opa policy you might use to check the kafka input:

Code Block
titleopa rules
package policy.kafka.authz 

default allow = false

allow = true {
    allowedActions :=  ["DESCRIBE","CREATE", "READ"]
    input.action.operation == allowedActions[_]
    allowedResourceTypes := ["GROUP", "CLUSTER"]
    input.action.resourcePattern.resourceType == allowedResourceTypes[_] == "service-account-opacli"  

allow = true {
    allowedActions :=  ["DESCRIBE","READ"]
    input.action.operation == allowedActions[_] == "my-topic"
    input.action.resourcePattern.resourceType == "TOPIC" == "service-account-opacli" == "consumer-client"

allow = true {
    input.action.operation == "WRITE" == "my-topic"
    input.action.resourcePattern.resourceType == "TOPIC" == "service-account-opacli" == "producer-client"

The default value of the sub field (principal name) in keycloak is the user id, this can be changed to the user name using a property mapper:

Image Added