...
Code Block | ||||
---|---|---|---|---|
| ||||
spec: initContainers: - name: init-keycloak image: busybox command: ['sh', '-c', 'until nc -vz keycloak.default 8080; do echo waiting for keycloak; sleep 2; done;'] containers: - name: a1-policy image: hashicorp/http-echo ports: - containerPort: 5678 args: - -text - "Hello a1-policy" |
See also: keycloak.yaml
Keycloak Operator
Keycloak Operator Installation
Istio mTLS
Test: Istio / Mutual TLS Migration
...
Dynamic Policy Composition for OPA
Open Policy Agent: Authorization in a Cloud Native World
Microservices Authorization using Open Policy Agent and Traefik (API Gateway)
A Study in Serverless Authorization with Open Policy Agent
Godaddy Opa Lambda Extension Plugin
GO
Go provides a library for opa.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/open-policy-agent/opa/rego"
"io/ioutil"
"net/http"
"net/url"
"os"
)
type Jwttoken struct {
Access_token string
Expires_in int
Refresh_expires_in int
Refresh_token string
Token_type string
Not_before_policy int
Session_state string
Scope string
}
var token Jwttoken
var opaPolicy string = `
package authz
import future.keywords.in
default allow = false
jwks := jwks_request("http://keycloak:8080/auth/realms/opa/protocol/openid-connect/certs").body
filtered_jwks := [ key |
some key in jwks.keys
key.use == "sig"
]
token_cert := json.marshal({"keys": filtered_jwks})
token = { "isValid": isValid, "header": header, "payload": payload } {
[isValid, header, payload] := io.jwt.decode_verify(input, { "cert": token_cert, "aud": "account", "iss": "http://keycloak:808
0/auth/realms/opa"})
}
allow {
is_token_valid
}
is_token_valid {
token.isValid
now := time.now_ns() / 1000000000
token.payload.iat <= now
now < token.payload.exp
token.payload.clientRole == "[opa-client-role]"
}
jwks_request(url) = http.send({
"url": url,
"method": "GET",
"force_cache": true,
"force_json_decode": true,
"force_cache_duration_seconds": 3600 # Cache response for an hour
})
`
func getToken() string {
clientSecret := "63wkv0RUXkp01pbqtNTSwghhTxeMW55I"
clientId := "opacli"
realmName := "opa"
keycloakHost := "keycloak"
keycloakPort := "8080"
keycloakUrl := "http://" + keycloakHost + ":" + keycloakPort + "/auth/realms/" + realmName + "/protocol/openid-connect/token" resp, err := http.PostForm(keycloakUrl,
url.Values{"client_secret": {clientSecret}, "grant_type": {"client_credentials"}, "client_id": {clientId}})
if err != nil {
fmt.Println(err)
panic("Something wrong with the credentials or url ")
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
json.Unmarshal([]byte(body), &token)
return token.Access_token
}
func traceOpa(input string) {
ctx := context.TODO()
test := rego.New(
rego.Query("x = data.authz.allow"),
rego.Trace(true),
rego.Module("example.rego", opaPolicy),
rego.Input(input),
)
test.Eval(ctx)
rego.PrintTraceWithLocation(os.Stdout, test)
}
func evaluateOpa(input string) {
ctx := context.TODO()
query, err := rego.New(
rego.Query("x = data.authz.allow"),
rego.Module("example.rego", opaPolicy),
).PrepareForEval(ctx)
if err != nil {
// Handle error.
fmt.Println(err.Error())
}
results, err := query.Eval(ctx, rego.EvalInput(input))
// Inspect results.
if err != nil {
// Handle evaluation error.
fmt.Println("Error: " + err.Error())
} else if len(results) == 0 {
// Handle undefined result.
fmt.Println("Results are empty")
} else {
// Handle result/decision.
fmt.Printf("Results = %+v\n", results) //=> [{Expressions:[true] Bindings:map[x:true]}]
}
}
func main() {
tokenStr := getToken()
traceOpa(tokenStr)
evaluateOpa(tokenStr)
}
|
...
Code Block | ||||
---|---|---|---|---|
| ||||
package policies.rappopaprovider.policy
import input.attributes.request.http as http_request
import future.keywords.in
realm_name := "opa"
realm_url := sprintf("http://keycloak:8080/auth/realms/%v", [realm_name])
certs_url := sprintf("%v/protocol/openid-connect/certs", [realm_url])
jwks := jwks_request(certs_url).body
filtered_jwks := [ key |
some key in jwks.keys
key.use == "sig"
]
token_cert := json.marshal({"keys": filtered_jwks})
token = { "isValid": isValid, "header": header, "payload": payload } {
[_, encoded] := split(http_request.headers.authorization, " ")
[isValid, header, payload] := io.jwt.decode_verify(encoded, { "cert": token_cert, "aud": "account", "iss": realm_url})
}
deny[msg] {
not is_token_valid
msg = "denied by rappopaprovider.policy: not a valid token"
}
is_token_valid {
token.isValid
now := time.now_ns() / 1000000000
token.payload.iat <= now
now < token.payload.exp
token.payload.clientRole == "[opa-client-role]"
}
jwks_request(url) = http.send({
"url": url,
"method": "GET",
"force_cache": true,
"force_json_decode": true,
"force_cache_duration_seconds": 3600 # Cache response for an hour
}) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
#!/bin/bash
INGRESS_HOST=$(minikube ip)
INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
TESTS=0
PASSED=0
FAILED=0
TEST_TS=$(date +%F-%T)
TOKEN=""
ACCESS_TOKEN=""
REFRESH_TOKEN=""
function get_token
{
local prefix="${1}"
url="http://${KEYCLOAK_HOST}:${KEYCLOAK_PORT}/auth/realms"
TOKEN=$(curl -s -X POST $url/opa/protocol/openid-connect/token -H \
"Content-Type: application/x-www-form-urlencoded" -d client_secret=63wkv0RUXkp01pbqtNTSwghhTxeMW55I \
-d 'grant_type=client_credentials' -d client_id=opacli)
ACCESS_TOKEN=$(echo $TOKEN | jq -r '.access_token')
}
function run_test
{
local prefix="${1}" type=${2} msg="${3}" data=${4}
TESTS=$((TESTS+1))
echo "Test ${TESTS}: Testing $type /${prefix}"
get_token $prefix
url=$INGRESS_HOST:$INGRESS_PORT"/"$prefix
result=$(curl -s -X ${type} -H "Content-type: application/json" -H "Authorization: Bearer $ACCESS_TOKEN" $url)
echo $result
if [ "$result" != "$msg" ]; then
echo "FAIL"
FAILED=$((FAILED+1))
else
echo "PASS"
PASSED=$((PASSED+1))
fi
echo ""
}
run_test "rapp-opa-provider" "GET" "Hello OPA World!" ""
echo
echo "-----------------------------------------------------------------------"
echo "Number of Tests: $TESTS, Tests Passed: $PASSED, Tests Failed: $FAILED"
echo "Date: $TEST_TS"
echo "-----------------------------------------------------------------------" |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package policy.services.rappopaprovider.ingress import data.policy.common.request allow = true { request.token.isValid request.method == "GET" request.path == [ "rapp-opa-provider" ] now := time.now_ns() / 1000000000 request.iat <= now now < request.exp request.clientRole == "[opa-client-role]" } |
Lastly create the parent rules file that will call the appropiates policy based on the http request path
...
opa bench --data rbactest.rego 'data.rbactest.allow'
+-------------------------------------------------+------------+
| samples | 22605 |
| ns/op | 47760 |
| B/op | 6269 |
| allocs/op | 112 |
| histogram_timer_rego_external_resolve_ns_75% | 400 |
| histogram_timer_rego_external_resolve_ns_90% | 500 |
| histogram_timer_rego_external_resolve_ns_95% | 500 |
| histogram_timer_rego_external_resolve_ns_99% | 871 |
| histogram_timer_rego_external_resolve_ns_99.9% | 29394 |
| histogram_timer_rego_external_resolve_ns_99.99% | 29800 |
| histogram_timer_rego_external_resolve_ns_count | 22605 |
| histogram_timer_rego_external_resolve_ns_max | 29800 |
| histogram_timer_rego_external_resolve_ns_mean | 434 |
| histogram_timer_rego_external_resolve_ns_median | 400 |
| histogram_timer_rego_external_resolve_ns_min | 200 |
| histogram_timer_rego_external_resolve_ns_stddev | 1045 |
| histogram_timer_rego_query_eval_ns_75% | 31100 |
| histogram_timer_rego_query_eval_ns_90% | 37210 |
| histogram_timer_rego_query_eval_ns_95% | 47160 |
| histogram_timer_rego_query_eval_ns_99% | 91606 |
| histogram_timer_rego_query_eval_ns_99.9% | 630561 |
| histogram_timer_rego_query_eval_ns_99.99% | 631300 |
| histogram_timer_rego_query_eval_ns_count | 22605 |
| histogram_timer_rego_query_eval_ns_max | 631300 |
| histogram_timer_rego_query_eval_ns_mean | 29182 |
| histogram_timer_rego_query_eval_ns_median | 25300 |
| histogram_timer_rego_query_eval_ns_min | 15200 |
| histogram_timer_rego_query_eval_ns_stddev | 32411 |
+-------------------------------------------------+------------+
OPA & Minio
OPA & MinIO's Access Management Plugin
OPA Sidecar injection
First create a namespace for your apps and enable istio and opa
...
Code Block | ||||
---|---|---|---|---|
| ||||
package main import ( "bytes" "context" "crypto" "crypto/x509" "crypto/x509/pkix" "encoding/json" "encoding/pem" "flag" "fmt" "golang.org/x/crypto/ocsp" "io/ioutil" "log" "net/http" "net/url" "os" "time" ) type Cfinfo struct { Success string `json:"success,omitempty"` Result struct { Certificate string `json:"certificate"` Usages []string `json:"usages"` Expiry string `json:"expiry"` } Errors []string `json:"errors"` Messages []string `json:"messages"` } var cfinfo Cfinfo type Crlinfo struct { Success bool `json:"success,omitempty"` Result string `json:"result,omitempty"` Errors []string `json:"errors"` Messages []string `json:"messages"` } var crlinfo Crlinfo type ServerParameters struct { certFile string // path to the x509 cert issuerUrl string // webhook server port crlUrl string // webhook server port } var parameters ServerParameters func main() { flag.StringVar(¶meters.certFile, "certFile", "", "File containing the x509 certificate") flag.StringVar(¶meters.issuerUrl, "issuerUrl", "http://127.0.0.1:8888/api/v1/cfssl/info", "Url for retrieving the issuer certificate") flag.StringVar(¶meters.crlUrl, "crlUrl", "http://127.0.0.1:8888/api/v1/cfssl/crl", "Url for retrieving the crl") flag.Parse() if parameters.certFile == "" { flag.Usage() os.Exit(1) } // read x509 certificate from PEM encoded file cert_bytes := readFile(parameters.certFile) cert, err := decodeCert(cert_bytes) if err != nil { log.Fatal(err) } issuer_bytes, err := getIssuer(parameters.issuerUrl) //readCert(os.Args[2]) if err != nil { log.Fatal(err) } issuer, err := decodeCert(issuer_bytes) if err != nil { log.Fatal(err) } // Perform OCSP Check fmt.Println("Checing OCSP") status, err := checkOCSPStatus(cert, issuer) if err != nil { fmt.Println(err) } else { switch status { case ocsp.Good: fmt.Printf("[+] Certificate status is Good\n") case ocsp.Revoked: fmt.Printf("[-] Certificate status is Revoked\n") case ocsp.Unknown: fmt.Printf("[-] Certificate status is Unknown\n") } crl_bytes, err := getCrl(parameters.crlUrl) //readCert(os.Args[2]) if err != nil { log.Fatal(err) } crl, err := decodeCrl(crl_bytes) if err != nil { log.Fatal(err) } fmt.Println("\nChecing CRL") _, err = checkCRLStatus(cert, issuer, crl) if err != nil { log.Fatal(err) }else{ fmt.Println("[+] Certificate status is not Revoked\n") } } func checkCRLStatus(cert *x509.Certificate, issuer *x509.Certificate, crl *pkix.CertificateList) (bool, error) { var revoked = false // Check CRL signature err := issuer.CheckCRLSignature(crl) if err != nil { return revoked, err } // Check CRL validity if crl.TBSCertList.NextUpdate.Before(time.Now()) { return revoked, fmt.Errorf("CRL is outdated") } // Searching for our certificate in CRL for _, revokedCertificate := range crl.TBSCertList.RevokedCertificates { if revokedCertificate.SerialNumber.Cmp(cert.SerialNumber) == 0 { //Found validated certificate in list of revoked ones revoked = true return revoked, fmt.Errorf("[-] Certificate status is Revoked\n") } } return revoked, nil } // CheckOCSPStatus will make an OCSP request for the provided certificate. // If the status of the certificate is not good, then an error is returned. func checkOCSPStatus(cert *x509.Certificate, issuer *x509.Certificate) (int, error) { var ( ctx = context.Background() ocspURL = cert.OCSPServer[0] ) // Build OCSP request buffer, err := ocsp.CreateRequest(cert, issuer, &ocsp.RequestOptions{ Hash: crypto.SHA256, }) if err != nil { return ocsp.Unknown, fmt.Errorf("creating ocsp request body: %w", err) } req, err := http.NewRequest(http.MethodPost, ocspURL, bytes.NewBuffer(buffer)) if err != nil { return ocsp.Unknown, fmt.Errorf("creating http request: %w", err) } ocspUrl, err := url.Parse(ocspURL) if err != nil { return ocsp.Unknown, fmt.Errorf("parsing ocsp url: %w", err) } req.Header.Add("Content-Type", "application/ocsp-request") req.Header.Add("Accept", "application/ocsp-response") req.Header.Add("host", ocspUrl.Host) req = req.WithContext(ctx) // Make OCSP request httpResponse, err := http.DefaultClient.Do(req) if err != nil { return ocsp.Unknown, fmt.Errorf("making ocsp request: %w", err) } defer httpResponse.Body.Close() output, err := ioutil.ReadAll(httpResponse.Body) if err != nil { return ocsp.Unknown, fmt.Errorf("reading response body: %w", err) } // Parse response ocspResponse, err := ocsp.ParseResponse(output, issuer) if err != nil { return ocsp.Unknown, fmt.Errorf("parsing ocsp response: %w", err) } return ocspResponse.Status, nil } func readFile(file string) []byte { cert, err := ioutil.ReadFile(file) if err != nil { log.Fatalln(err) } return cert } func decodeCert(cert_bytes []byte) (*x509.Certificate, error) { b, _ := pem.Decode(cert_bytes) var cert *x509.Certificate cert, err := x509.ParseCertificate(b.Bytes) if err != nil { fmt.Println("Parse Error") return nil, fmt.Errorf("parsing certificate: %w", err) } return cert, err } func decodeCrl(cert_bytes []byte) (*pkix.CertificateList, error) { b, _ := pem.Decode(cert_bytes) crl, err := x509.ParseCRL(b.Bytes) if err != nil { return nil, fmt.Errorf("parsing crl: %w", err) } return crl, err } func getIssuer(issuerUrl string) ([]byte, error) { var resp = &http.Response{} values := map[string]string{"label": "intermediate"} jsonValue, _ := json.Marshal(values) resp, err := http.Post(issuerUrl, "application/json", bytes.NewBuffer(jsonValue)) if err != nil { fmt.Println(err) panic("Something wrong with the post request") } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) json.Unmarshal([]byte(body), &cfinfo) return []byte(cfinfo.Result.Certificate), nil } func getCrl(crlUrl string) ([]byte, error) { resp, err := http.Get(crlUrl) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode >= 300 { return nil, fmt.Errorf("failed to retrieve CRL") } body, err := ioutil.ReadAll(resp.Body) json.Unmarshal([]byte(body), &crlinfo) crlString := "-----BEGIN X509 CRL-----\n" + crlinfo.Result + "\n-----END X509 CRL-----" return []byte(crlString), err } |
Kafka
Strimzi
You can install strimzi kafka using the quick start guide: Strimzi Quickstarts
You'll need to overwrite the kafka cluster with your own definition: my-cluster.yaml
This file includes an authorization section:
Code Block | ||||
---|---|---|---|---|
| ||||
authorization:
type: opa
url: http://opa.default:8181/v1/data/policy/kafka/authz/allow
allowOnError: false
initialCacheCapacity: 1000
maximumCacheSize: 10000
expireAfterMs: 10 #60000
superUsers:
- CN=henri
- anwar
- CN=wesley
- CN=my-user |
This file includes an listeners section:
Code Block | ||||
---|---|---|---|---|
| ||||
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: true
type: internal
- name: plain3
port: 9097
tls: false
type: internal
authentication:
type: oauth
checkIssuer: false
checkAccessTokenType: true
accessTokenIsJwt: true
enableOauthBearer: true
introspectionEndpointUri: http://keycloak.default:8080/auth/realms/opa/protocol/openid-connect/token/introspect
clientId: opacli
clientSecret:
secretName: my-cluster-oauth
key: clientSecret
validIssuerUri: http://keycloak.default:8080/auth/realms/opa
clientAudience: account
customClaimCheck: "@.resource_access['opacli'].roles[0] == 'opa-client-role'" |
Where opa and keycloak are configured.
If you are publishing or subscribing using this listener you will need to provide a JWT with your request.
The token will be verified using keycloak along with the audience field and the resource access['opaclii'].roles field.
Kafka will then pass the authorizaton on to opa where other checks will take place.
Below is a sample opa policy you might use to check the kafka input:
Code Block | ||||
---|---|---|---|---|
| ||||
package policy.kafka.authz
default allow = false
allow = true {
allowedActions := ["DESCRIBE", "DESCRIBE_CONFIGS", "READ"]
input.action.operation == allowedActions[_]
input.requestContext.header.name.clientId == "kowl"
}
allow = true {
allowedActions := ["DESCRIBE", "DESCRIBE_CONFIGS"]
allowedTopics := ["__consumer_offsets","__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog",]
input.action.resourcePattern.name == allowedTopics[_]
}
allow = true {
allowedActions := ["DESCRIBE", "READ"]
input.action.operation == allowedActions[_]
allowedResourceTypes := ["GROUP"]
input.action.resourcePattern.resourceType == allowedResourceTypes[_]
input.requestContext.principal.name == "service-account-opacli"
}
allow = true {
input.action.operation == "CREATE"
input.action.resourcePattern.name == "kafka-cluster"
input.action.resourcePattern.resourceType == "CLUSTER"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "consumer-client"
}
allow = true {
allowedActions := ["DESCRIBE","READ"]
input.action.operation == allowedActions[_]
input.action.resourcePattern.name == "my-topic"
input.action.resourcePattern.resourceType == "TOPIC"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "consumer-client"
}
allow = true {
allowedActions := ["DESCRIBE","WRITE"]
input.action.operation == allowedActions[_]
input.action.resourcePattern.name == "my-topic"
input.action.resourcePattern.resourceType == "TOPIC"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "producer-client"
} |
The default value of the sub field (principal name) in keycloak is the user id, this can be changed to the user name using a property mapper:
kube-mgmt
By default the entire policy and data cache are defined by the opa bundle.
If you need to add data from other sources you need to define the bundle root directories in the .manifest file
e.g.
Code Block | ||||
---|---|---|---|---|
| ||||
{
"revision" : "1",
"roots": ["policy", "roles"]
} |
You can then add the kube-mgmt sidecar to your opa deployment and this will pull data from configmaps in the namespace speciiied
e.g.
Code Block | ||
---|---|---|
| ||
- name: kube-mgmt
image: openpolicyagent/kube-mgmt:latest
args:
- "--namespaces=opa" |
Code Block | ||||
---|---|---|---|---|
| ||||
kind: ConfigMap
apiVersion: v1
metadata:
name: hello-data
namespace: opa
labels:
openpolicyagent.org/data: opa
data:
x.json: |
{"a": [1,2,3,4]} |
Code Block | ||||
---|---|---|---|---|
| ||||
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
} |
You can also add extra data using curl e.g. curl -X PUT $host:31182/v1/data/jwt -d @token.json
You can view all the data available to opa using curl: curl <opa host>:<port>/v1/data
You should see something similar to the following output:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"decision_id":"067f7d2d-0e5d-4ca8-8cb0-6d38543c6b2f",
"result":{
"jwt":{ "access_token":"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
},
"opa":{
"hello-data":{
"x.json":{
"a":[
1,
2,
3,
4
]
}
}
},
"servers":{
"id":"s1",
"name":"app",
"ports":[
"p1",
"p2",
"p3"
],
"protocols":[
"https",
"ssh"
]
}
}
} |
You can get individual values like this: curl <opa host>:<port>/v1/data/jwt/access_token or refer to them in your code like this: data.opa["hello-data"]["x.json"].a[0]
Strimzi Bridge
You can communicate with kafka over http by adding strimzi bridge:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
name: strimzi-bridge
namespace: kafka
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
http:
port: 8080 |
See also:
Using Open Policy Agent with Strimzi and Apache Kafka
Secure Kafka with Keycloak: SASL OAuth Bearer
Enforce Custom Resource policies with Open Policy Agent Gatekeeper
How to integrate Kafka with Istio
Example with JWT payload passed from Envoy
Kafka authentication using OAuth 2.0
Using HTTP Bridge as a Kubernetes sidecar
Kafka SSL : Setup with self signed certificate
Kafka & Istio
Integration of Istio with Kafka is limited.
The Kafka Protocol is a TCP level protocol (Layer 4)
Kafka is a binary wrapper over TCP protocol.
Istio mainly supports HTTP/HTTPS on layer 7.
Higher level protocols such as HTTP have access to metadata so they can properly route requests, this is not the case with the Kafka protocol.
Service Mesh and Proxies: Examples for Kafka
Setting up Strimzi bridge with Istio Authorization and ACLs
The Kiali screenshot above represents both a producer and a consumer connecting to Kafka through the Strmzi bridge.
Access to the bridge is controlled using JWT which is checked using an Istio AuthorizationPolicy.
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: "security.istio.io/v1beta1"
kind: "AuthorizationPolicy"
metadata:
name: "kafka-bridge-policy"
namespace: kafka
spec:
selector:
matchLabels:
app.kubernetes.io/name: kafka-bridge
action: ALLOW
rules:
- from:
- source:
requestPrincipals: ["http://keycloak.default:8080/auth/realms/opa", "http://istio-ingressgateway.istio-system:80/auth/realms/opa"]
- to:
- operation:
methods: ["POST", "GET"]
paths: ["/topics", "/topics/*", "/consumers/*"]
when:
- key: request.auth.claims[clientRole]
values: ["opa-client-role"] |
By enabling tls on one of our bootstrap listeners user certificates are automatically created when we create a new KafkaUser
- name: tlsauth
type: internal
port: 9096
tls: true
authentication:
type: tls
Setting the authentication type to tls also ensure secure communication between Kafka and zookeeper.
We also set authorization to simple for the Kafka cluster, this means ACLs will be used.
kafka:
authorization:
type: simple
superUsers:
- CN=kowl
Any users you want to by-pass ACL checks should be listed under superUsers.
We can then setup our bridge user and include the ACL permissions as part of the KafkaUser definition:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: bridge
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# access to the topic
- resource:
type: topic
name: my-topic
operations:
- Create
- Describe
- Read
- Write
- AlterConfigs
host: "*"
# access to the group
- resource:
type: group
name: my-group
operations:
- Describe
- Read
host: "*"
# access to the cluster
- resource:
type: cluster
operations:
- Alter
- AlterConfigs
host: "*" |
Authorization between the bridge and Kafka is done using user certificates.
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
name: strimzi-bridge
namespace: kafka
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9096
http:
port: 8080
logging:
type: inline
loggers:
logger.bridge.level: "DEBUG"
logger.send.name: "http.openapi.operation.send"
logger.send.level: "DEBUG"
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: bridge
certificate: user.crt
key: user.key |
Records can then to be posted to a topic with a call like this:
Endpoint:
http://istio-ingressgateway.istio-system:80/topics/my-topic
Payload: {"records":[{"key":"key-366","value":"value-366"}]}
and read from a topic using the following syntax:
i) Create a consumer group
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group
Payload: {"name":"my-consumer","format":"json","auto.offset.reset":"earliest","enable.auto.commit":false}
ii) subscribe to a topic
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/subscription
Payload: {"topics":["my-topic"]}
iii) Read records from a topic
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/records
These calls will be redirected to the bridge using a gateway and a virtual service
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: strimzi-bridge-gateway
namespace: kafka
spec:
selector:
istio: ingressgateway # use Istio gateway implementation
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- "*"
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: strimzi-bridge-vs
namespace: kafka
spec:
hosts:
- "*"
gateways:
- strimzi-bridge-gateway
http:
- name: "strimzi-bridge-routes"
match:
- uri:
prefix: "/topics"
- uri:
prefix: "/consumers"
route:
- destination:
port:
number: 8080
host: strimzi-bridge-bridge-service.kafka.svc.cluster.local |
Note: To use Kowl with this setup you'll need to create a kowl user and configure kowl to use tls
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: kowl
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls |
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: v1
kind: ConfigMap
metadata:
name: kowl-config-cm
namespace: kafka
data:
config.yaml: |
kafka:
brokers:
- my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9093
tls:
enabled: true
caFilepath: /etc/strimzi/ca/ca.crt
certFilepath: /etc/strimzi/user-crt/crt/user.crt
keyFilepath: /etc/strimzi/user-key/key/user.key |
- ca.crt is obtained from the my-cluster-cluster-ca-cert secret, user.crt and user.key are obtained from the kowl secret.
Admission Controllers
An admission controller is a piece of code that intercepts requests to the Kubernetes API server prior to persistence of the object, but after the request is authenticated and authorized.
Admission Controllers Reference
Kyverno
Opa Gatekeeper
OPA Gatekeeper: Policy and Governance for Kubernetes