Draft
Table of Contents |
---|
JIRA Ticket
...
Code Block | ||||
---|---|---|---|---|
| ||||
spec: initContainers: - name: init-keycloak image: busybox command: ['sh', '-c', 'until nc -vz keycloak.default 8080; do echo waiting for keycloak; sleep 2; done;'] containers: - name: a1-policy image: hashicorp/http-echo ports: - containerPort: 5678 args: - -text - "Hello a1-policy" |
See also: keycloak.yaml
Keycloak Operator
Keycloak Operator Installation
Istio mTLS
Test: Istio / Mutual TLS Migration
...
This does not really work for a single shard instance like the one we are using.
See also: Grafana playground
Prometheus
Start the prometheus dashboard by running: istioctl dashboard prometheus
...
Dynamic Policy Composition for OPA
Open Policy Agent: Authorization in a Cloud Native World
Microservices Authorization using Open Policy Agent and Traefik (API Gateway)
A Study in Serverless Authorization with Open Policy Agent
Godaddy Opa Lambda Extension Plugin
GO
Go provides a library for opa.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/open-policy-agent/opa/rego"
"io/ioutil"
"net/http"
"net/url"
"os"
)
type Jwttoken struct {
Access_token string
Expires_in int
Refresh_expires_in int
Refresh_token string
Token_type string
Not_before_policy int
Session_state string
Scope string
}
var token Jwttoken
var opaPolicy string = `
package authz
import future.keywords.in
default allow = false
jwks := jwks_request("http://keycloak:8080/auth/realms/opa/protocol/openid-connect/certs").body
filtered_jwks := [ key |
some key in jwks.keys
key.use == "sig"
]
token_cert := json.marshal({"keys": filtered_jwks})
token = { "isValid": isValid, "header": header, "payload": payload } {
[isValid, header, payload] := io.jwt.decode_verify(input, { "cert": token_cert, "aud": "account", "iss": "http://keycloak:808
0/auth/realms/opa"})
}
allow {
is_token_valid
}
is_token_valid {
token.isValid
now := time.now_ns() / 1000000000
token.payload.iat <= now
now < token.payload.exp
token.payload.clientRole == "[opa-client-role]"
}
jwks_request(url) = http.send({
"url": url,
"method": "GET",
"force_cache": true,
"force_json_decode": true,
"force_cache_duration_seconds": 3600 # Cache response for an hour
})
`
func getToken() string {
clientSecret := "63wkv0RUXkp01pbqtNTSwghhTxeMW55I"
clientId := "opacli"
realmName := "opa"
keycloakHost := "keycloak"
keycloakPort := "8080"
keycloakUrl := "http://" + keycloakHost + ":" + keycloakPort + "/auth/realms/" + realmName + "/protocol/openid-connect/token" resp, err := http.PostForm(keycloakUrl,
url.Values{"client_secret": {clientSecret}, "grant_type": {"client_credentials"}, "client_id": {clientId}})
if err != nil {
fmt.Println(err)
panic("Something wrong with the credentials or url ")
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
json.Unmarshal([]byte(body), &token)
return token.Access_token
}
func traceOpa(input string) {
ctx := context.TODO()
test := rego.New(
rego.Query("x = data.authz.allow"),
rego.Trace(true),
rego.Module("example.rego", opaPolicy),
rego.Input(input),
)
test.Eval(ctx)
rego.PrintTraceWithLocation(os.Stdout, test)
}
func evaluateOpa(input string) {
ctx := context.TODO()
query, err := rego.New(
rego.Query("x = data.authz.allow"),
rego.Module("example.rego", opaPolicy),
).PrepareForEval(ctx)
if err != nil {
// Handle error.
fmt.Println(err.Error())
}
results, err := query.Eval(ctx, rego.EvalInput(input))
// Inspect results.
if err != nil {
// Handle evaluation error.
fmt.Println("Error: " + err.Error())
} else if len(results) == 0 {
// Handle undefined result.
fmt.Println("Results are empty")
} else {
// Handle result/decision.
fmt.Printf("Results = %+v\n", results) //=> [{Expressions:[true] Bindings:map[x:true]}]
}
}
func main() {
tokenStr := getToken()
traceOpa(tokenStr)
evaluateOpa(tokenStr)
}
|
...
Code Block | ||||
---|---|---|---|---|
| ||||
package policies.rappopaprovider.policy
import input.attributes.request.http as http_request
import future.keywords.in
realm_name := "opa"
realm_url := sprintf("http://keycloak:8080/auth/realms/%v", [realm_name])
certs_url := sprintf("%v/protocol/openid-connect/certs", [realm_url])
jwks := jwks_request(certs_url).body
filtered_jwks := [ key |
some key in jwks.keys
key.use == "sig"
]
token_cert := json.marshal({"keys": filtered_jwks})
token = { "isValid": isValid, "header": header, "payload": payload } {
[_, encoded] := split(http_request.headers.authorization, " ")
[isValid, header, payload] := io.jwt.decode_verify(encoded, { "cert": token_cert, "aud": "account", "iss": realm_url})
}
deny[msg] {
not is_token_valid
msg = "denied by rappopaprovider.policy: not a valid token"
}
is_token_valid {
token.isValid
now := time.now_ns() / 1000000000
token.payload.iat <= now
now < token.payload.exp
token.payload.clientRole == "[opa-client-role]"
}
jwks_request(url) = http.send({
"url": url,
"method": "GET",
"force_cache": true,
"force_json_decode": true,
"force_cache_duration_seconds": 3600 # Cache response for an hour
}) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
#!/bin/bash
INGRESS_HOST=$(minikube ip)
INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
TESTS=0
PASSED=0
FAILED=0
TEST_TS=$(date +%F-%T)
TOKEN=""
ACCESS_TOKEN=""
REFRESH_TOKEN=""
function get_token
{
local prefix="${1}"
url="http://${KEYCLOAK_HOST}:${KEYCLOAK_PORT}/auth/realms"
TOKEN=$(curl -s -X POST $url/opa/protocol/openid-connect/token -H \
"Content-Type: application/x-www-form-urlencoded" -d client_secret=63wkv0RUXkp01pbqtNTSwghhTxeMW55I \
-d 'grant_type=client_credentials' -d client_id=opacli)
ACCESS_TOKEN=$(echo $TOKEN | jq -r '.access_token')
}
function run_test
{
local prefix="${1}" type=${2} msg="${3}" data=${4}
TESTS=$((TESTS+1))
echo "Test ${TESTS}: Testing $type /${prefix}"
get_token $prefix
url=$INGRESS_HOST:$INGRESS_PORT"/"$prefix
result=$(curl -s -X ${type} -H "Content-type: application/json" -H "Authorization: Bearer $ACCESS_TOKEN" $url)
echo $result
if [ "$result" != "$msg" ]; then
echo "FAIL"
FAILED=$((FAILED+1))
else
echo "PASS"
PASSED=$((PASSED+1))
fi
echo ""
}
run_test "rapp-opa-provider" "GET" "Hello OPA World!" ""
echo
echo "-----------------------------------------------------------------------"
echo "Number of Tests: $TESTS, Tests Passed: $PASSED, Tests Failed: $FAILED"
echo "Date: $TEST_TS"
echo "-----------------------------------------------------------------------" |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package policy.services.rappopaprovider.ingress import data.policy.common.request allow = true { request.token.isValid request.method == "GET" request.path == [ "rapp-opa-provider" ] now := time.now_ns() / 1000000000 request.iat <= now now < request.exp request.clientRole == "[opa-client-role]" } |
Lastly create the parent rules file that will call the appropiates policy based on the http request path
...
opa bench --data rbactest.rego 'data.rbactest.allow'
+-------------------------------------------------+------------+
| samples | 22605 |
| ns/op | 47760 |
| B/op | 6269 |
| allocs/op | 112 |
| histogram_timer_rego_external_resolve_ns_75% | 400 |
| histogram_timer_rego_external_resolve_ns_90% | 500 |
| histogram_timer_rego_external_resolve_ns_95% | 500 |
| histogram_timer_rego_external_resolve_ns_99% | 871 |
| histogram_timer_rego_external_resolve_ns_99.9% | 29394 |
| histogram_timer_rego_external_resolve_ns_99.99% | 29800 |
| histogram_timer_rego_external_resolve_ns_count | 22605 |
| histogram_timer_rego_external_resolve_ns_max | 29800 |
| histogram_timer_rego_external_resolve_ns_mean | 434 |
| histogram_timer_rego_external_resolve_ns_median | 400 |
| histogram_timer_rego_external_resolve_ns_min | 200 |
| histogram_timer_rego_external_resolve_ns_stddev | 1045 |
| histogram_timer_rego_query_eval_ns_75% | 31100 |
| histogram_timer_rego_query_eval_ns_90% | 37210 |
| histogram_timer_rego_query_eval_ns_95% | 47160 |
| histogram_timer_rego_query_eval_ns_99% | 91606 |
| histogram_timer_rego_query_eval_ns_99.9% | 630561 |
| histogram_timer_rego_query_eval_ns_99.99% | 631300 |
| histogram_timer_rego_query_eval_ns_count | 22605 |
| histogram_timer_rego_query_eval_ns_max | 631300 |
| histogram_timer_rego_query_eval_ns_mean | 29182 |
| histogram_timer_rego_query_eval_ns_median | 25300 |
| histogram_timer_rego_query_eval_ns_min | 15200 |
| histogram_timer_rego_query_eval_ns_stddev | 32411 |
+-------------------------------------------------+------------+
OPA & Minio
OPA & MinIO's Access Management Plugin
OPA Sidecar injection
First create a namespace for your apps and enable istio and opa
...
Your bundle is now protected with basic authentication.
JWT Injection
To enable automatic include a jwt token in our rapp request we need to enable some k8s objects:
1) MutatingWebhookConfiguration to inject jwt retrieval pod as a sidecar. The MutatingWebhookConfiguration uses a pod mutating serive to alter our pod and add the new sidecar.
Code Block | ||||
---|---|---|---|---|
| ||||
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"k8s.io/api/admission/v1beta1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"log"
"net/http"
"strconv"
)
type ServerParameters struct {
port int // webhook server port
certFile string // path to the x509 cert
keyFile string // path to the x509 private key
}
type patchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}
var parameters ServerParameters
var (
universalDeserializer = serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
)
func main() {
flag.IntVar(¶meters.port, "port", 8443, "Webhook server port.")
flag.StringVar(¶meters.certFile, "tlsCertFile", "/certs/tls.crt", "File containing the x509 certificate")
flag.StringVar(¶meters.keyFile, "tlsKeyFile", "/certs/tls.key", "File containing the x509 private key")
flag.Parse()
http.HandleFunc("/inject-sidecar", HandleSideCarInjection)
log.Fatal(http.ListenAndServeTLS(":"+strconv.Itoa(parameters.port), parameters.certFile, parameters.keyFile, nil))
}
func HandleSideCarInjection(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
err = ioutil.WriteFile("/tmp/request", body, 0644)
if err != nil {
panic(err.Error())
}
var admissionReviewReq v1beta1.AdmissionReview
if _, _, err := universalDeserializer.Decode(body, nil, &admissionReviewReq); err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Errorf("Could not deserialize request: %v", err)
} else if admissionReviewReq.Request == nil {
w.WriteHeader(http.StatusBadRequest)
errors.New("Malformed admission review - request is empty")
}
fmt.Printf("Received Admission Review Request - Type: %v \t Event: %v \t Name: %v \n",
admissionReviewReq.Request.Kind,
admissionReviewReq.Request.Operation,
admissionReviewReq.Request.Name,
)
var pod v1.Pod
err = json.Unmarshal(admissionReviewReq.Request.Object.Raw, &pod)
if err != nil {
fmt.Errorf("Could not unmarshal pod from admission request: %v", err)
}
var patches []patchOperation
labels := pod.ObjectMeta.Labels
labels["sidecar-injection-webhook"] = "jwt-proxy"
patches = append(patches, patchOperation{
Op: "add",
Path: "/metadata/labels",
Value: labels,
})
var containers []v1.Container
containers = append(containers, pod.Spec.Containers...)
container := v1.Container{
Name: "jwt-proxy",
Image: "ktimoney/rapps-jwt",
ImagePullPolicy: v1.PullIfNotPresent,
Ports: []v1.ContainerPort{
{
Name: "http",
Protocol: v1.ProtocolTCP,
ContainerPort: 8888,
},
},
VolumeMounts: []v1.VolumeMount{
{
Name: "certsdir",
MountPath: "/certs",
ReadOnly: true,
},
},
}
containers = append(containers, container)
fmt.Println(containers)
patches = append(patches, patchOperation{
Op: "add",
Path: "/spec/containers",
Value: containers,
})
pathType := v1.HostPathDirectoryOrCreate
pathTypePtr := &pathType
var volumes []v1.Volume
volumes = append(volumes, pod.Spec.Volumes...)
volume := v1.Volume{
Name: "certsdir",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/var/rapps/certs",
Type: pathTypePtr,
},
},
}
volumes = append(volumes, volume)
fmt.Println(volumes)
patches = append(patches, patchOperation{
Op: "add",
Path: "/spec/volumes",
Value: volumes,
})
fmt.Println(patches)
patchBytes, err := json.Marshal(patches)
if err != nil {
fmt.Errorf("Error occurred when trying to marshal JSON patch: %v", err)
}
admissionReviewResponse := v1beta1.AdmissionReview{
Response: &v1beta1.AdmissionResponse{
UID: admissionReviewReq.Request.UID,
Allowed: true,
},
}
admissionReviewResponse.Response.Patch = patchBytes
bytes, err := json.Marshal(&admissionReviewResponse)
if err != nil {
fmt.Errorf("Error occurred when trying to marshal Aadmission Review response: %v", err)
}
w.Write(bytes)
} |
MutatingWebhookConfiguration.yaml
Note: You'll need to configure your deployment to include a tls.crt and tls.key secret. Your MutatingWebhookConfiguration will need to include the corresponding ca bubdle.
2) Envoyfilter to update request header with jwt token.
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
name: RAPP-NAME-outbound-filter
namespace: RAPP-NS
spec:
workloadSelector:
labels:
app.kubernetes.io/name: RAPP-NAME
configPatches:
# The first patch adds the lua filter to the listener/http connection manager
- applyTo: HTTP_FILTER
match:
context: SIDECAR_OUTBOUND
listener:
filterChain:
filter:
name: "envoy.filters.network.http_connection_manager"
subFilter:
name: "envoy.filters.http.router"
patch:
operation: INSERT_BEFORE
value: # lua filter specification
name: envoy.lua
typed_config:
"@type": "type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua"
inlineCode: |
function envoy_on_request(request_handle)
local uri = request_handle:headers():get(":path")
local method = request_handle:headers():get(":method")
if (method ~= "POST" and path ~= "/auth/realms/REALM-NAME/protocol/openid-connect/token")
then
-- Make an HTTP call to an upstream host with the following headers, body, and timeout.
local headers, body = request_handle:httpCall(
"jwt_cluster",
{
[":method"] = "GET",
[":path"] = "/token",
[":authority"] = "jwt-proxy",
["realm"] = "REALM-NAME",
["client"] = "CLIENT-NAME"
},
"jwt call",
5000)
if (headers["authorization"] ~= nil)
then
request_handle:headers():add("authorization", headers["authorization"])
end
end
end
- applyTo: CLUSTER
match:
context: SIDECAR_OUTBOUND
patch:
operation: ADD
value: # cluster specification
name: jwt_cluster
type: STRICT_DNS
connect_timeout: 60s
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: jwt_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 0.0.0.0
port_value: 8888 |
CFSSL
CFSSL is CloudFlare's PKI/TLS tool. It is both a command line tool and an HTTP API server for signing, verifying, and bundling TLS certificates.
To run this you first need to create an image with cfssl installed:
Code Block | ||||
---|---|---|---|---|
| ||||
FROM debian:latest
RUN apt-get update && apt-get install -y curl && \
curl -L https://github.com/cloudflare/cfssl/releases/download/v1.5.0/cfssl_1.5.0_linux_amd64 -o /usr/local/bin/cfssl && \
curl -L https://github.com/cloudflare/cfssl/releases/download/v1.5.0/cfssljson_1.5.0_linux_amd64 -o /usr/local/bin/cfssljson && \
chmod +x /usr/local/bin/cfssl && \
chmod +x /usr/local/bin/cfssljson
RUN mkdir /config
RUN mkdir /config
RUN mkdir /certs
WORKDIR /certs
EXPOSE 8888
EXPOSE 8889
ENTRYPOINT ["cfssl version"] |
This will install cfssl on a debian image.
You can then use this image to create a cfssl service in your k8s cluster.
kubectl create -f rapps-cfssl.yaml
Note: If you want to use this with a postgres db you'll need to setup a new database and username/password and then create the tables.
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT 'CREATE DATABASE cfssl'
WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = 'cfssl')\gexec
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_user WHERE usename = 'cfssl') THEN
CREATE USER cfssl WITH PASSWORD 'cfssl';
GRANT ALL PRIVILEGES ON DATABASE cfssl TO cfssl;
END IF;
END
$$; |
Login as the cfssl user then create the tables:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE IF NOT EXISTS certificates (
serial_number bytea NOT NULL,
authority_key_identifier bytea NOT NULL,
ca_label bytea,
status bytea NOT NULL,
reason int,
expiry timestamptz,
revoked_at timestamptz,
pem bytea NOT NULL,
issued_at timestamptz,
not_before timestamptz,
metadata jsonb,
sans jsonb,
common_name TEXT,
PRIMARY KEY(serial_number, authority_key_identifier)
);
CREATE TABLE IF NOT EXISTS ocsp_responses (
serial_number bytea NOT NULL,
authority_key_identifier bytea NOT NULL,
body bytea NOT NULL,
expiry timestamptz,
PRIMARY KEY(serial_number, authority_key_identifier),
FOREIGN KEY(serial_number, authority_key_identifier) REFERENCES certificates(serial_number, authority_key_identifier)
); |
Once the pod is up and running you can connect to it by using port forwarding:
kubectl port-forward service/rapps-cfssl 8888:8888
You can generate signed certificates using a post request like the following:
curl -s -X POST -H "Content-Type: application/json" -d @./rapp-helloworld-provider-server.json http://127.0.0.1:8888/api/v1/cfssl/newcert
The rapp-helloworld-provider-server.json looks like this:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"request":{
"hosts":[
"rapp-helloworld-provider"
],
"names":[
{
"C":"IE",
"ST":"Ireland",
"L":"Dublin",
"O":"EST Rapp Provider",
"OU":"EST Rapp Provider hosts"
}
],
"CN":"rapp-helloworld-provider",
"key":{
"algo":"rsa",
"size":2048
}
},
"profile":"server"
} |
To parse the response contents you can use the following method:
NEWCERT=$(curl -s -X POST -H "Content-Type: application/json" -d @./rapp-helloworld-provider-server.json http://127.0.0.1:8888/api/v1/cfssl/newcert)
echo $NEWCERT | jq -r .result.certificate
echo $NEWCERT | jq -r .result.private_key
OCSP & CRL
OCSP and CRL are ways of checking a certificates validity.
The cfssl crl endpoint run on port 8888: 127.0.0.1:8888/api/v1/cfssl/crl
For ocsp to run you need to run the following commands in your container:
cfssl ocsprefresh -db-config /config/db-pg.json -ca /certs/ca-server.pem -responder /certs/server-ocsp.pem -responder-key /certs/server-ocsp-key.pem
cfssl ocspdump -db-config /config/db-pg.json > ocspdump.txt
cfssl ocspserve -port=8889 -responses=/config/ocspdump.txt -loglevel=0
These commands will need to be re-run every time a new revoke request is received.
To revoke a certificate use:
curl -d '{ "serial": "708853190752997406197199597002842275021840632879", "authority_key_id": "dd2cdb5b5b8abbe2fba81fd6c04393938f802b03", "reason": "superseded" }' http://localhost:8888/api/v1/cfssl/revoke
You can obtain the serial and authority_key_id from your database using the following SQL:
select encode(authority_key_identifier,'escape') a_key, encode(serial_number,'escape') serial, encode(status,'escape') status from certificates;
You can also obtain the serial and authority_key_id from the certinfo endpoint
- Convert your certificate into a variable: pem=$(cat my.crt | sed -z 's/\n/\\n/g')
- curl -d '{"certificate": "'"$pem"'"}' http://localhost:8888/api/v1/cfssl/certinfo
- The serial number will come back as is but the authority_key_id needs to be converted: echo "22:8B:FA:ED:8B:FF:66:E7:05:A3:08:3A:41:33:D8:01:20:CA:CC:F4"| tr '[:upper:]' '[:lower:]' | sed s'/://'g =228bfaed8bff66e705a3083a4133d80120caccf4
You can check your certificate against ocsp and crl using the following code:
Code Block | ||||
---|---|---|---|---|
| ||||
package main
import (
"bytes"
"context"
"crypto"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"encoding/pem"
"flag"
"fmt"
"golang.org/x/crypto/ocsp"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"time"
)
type Cfinfo struct {
Success string `json:"success,omitempty"`
Result struct {
Certificate string `json:"certificate"`
Usages []string `json:"usages"`
Expiry string `json:"expiry"`
}
Errors []string `json:"errors"`
Messages []string `json:"messages"`
}
var cfinfo Cfinfo
type Crlinfo struct {
Success bool `json:"success,omitempty"`
Result string `json:"result,omitempty"`
Errors []string `json:"errors"`
Messages []string `json:"messages"`
}
var crlinfo Crlinfo
type ServerParameters struct {
certFile string // path to the x509 cert
issuerUrl string // webhook server port
crlUrl string // webhook server port
}
var parameters ServerParameters
func main() {
flag.StringVar(¶meters.certFile, "certFile", "", "File containing the x509 certificate")
flag.StringVar(¶meters.issuerUrl, "issuerUrl", "http://127.0.0.1:8888/api/v1/cfssl/info", "Url for retrieving the issuer certificate")
flag.StringVar(¶meters.crlUrl, "crlUrl", "http://127.0.0.1:8888/api/v1/cfssl/crl", "Url for retrieving the crl")
flag.Parse()
if parameters.certFile == "" {
flag.Usage()
os.Exit(1)
}
// read x509 certificate from PEM encoded file
cert_bytes := readFile(parameters.certFile)
cert, err := decodeCert(cert_bytes)
if err != nil {
log.Fatal(err)
}
issuer_bytes, err := getIssuer(parameters.issuerUrl) //readCert(os.Args[2])
if err != nil {
log.Fatal(err)
}
issuer, err := decodeCert(issuer_bytes)
if err != nil {
log.Fatal(err)
}
// Perform OCSP Check
fmt.Println("Checing OCSP")
status, err := checkOCSPStatus(cert, issuer)
if err != nil {
fmt.Println(err)
} else {
switch status {
case ocsp.Good:
fmt.Printf("[+] Certificate status is Good\n")
case ocsp.Revoked:
fmt.Printf("[-] Certificate status is Revoked\n")
case ocsp.Unknown:
fmt.Printf("[-] Certificate status is Unknown\n")
}
crl_bytes, err := getCrl(parameters.crlUrl) //readCert(os.Args[2])
if err != nil {
log.Fatal(err)
}
crl, err := decodeCrl(crl_bytes)
if err != nil {
log.Fatal(err)
}
fmt.Println("\nChecing CRL")
_, err = checkCRLStatus(cert, issuer, crl)
if err != nil {
log.Fatal(err)
}else{
fmt.Println("[+] Certificate status is not Revoked\n")
}
}
func checkCRLStatus(cert *x509.Certificate, issuer *x509.Certificate, crl *pkix.CertificateList) (bool, error) {
var revoked = false
// Check CRL signature
err := issuer.CheckCRLSignature(crl)
if err != nil {
return revoked, err
}
// Check CRL validity
if crl.TBSCertList.NextUpdate.Before(time.Now()) {
return revoked, fmt.Errorf("CRL is outdated")
}
// Searching for our certificate in CRL
for _, revokedCertificate := range crl.TBSCertList.RevokedCertificates {
if revokedCertificate.SerialNumber.Cmp(cert.SerialNumber) == 0 {
//Found validated certificate in list of revoked ones
revoked = true
return revoked, fmt.Errorf("[-] Certificate status is Revoked\n")
}
}
return revoked, nil
}
// CheckOCSPStatus will make an OCSP request for the provided certificate.
// If the status of the certificate is not good, then an error is returned.
func checkOCSPStatus(cert *x509.Certificate, issuer *x509.Certificate) (int, error) {
var (
ctx = context.Background()
ocspURL = cert.OCSPServer[0]
)
// Build OCSP request
buffer, err := ocsp.CreateRequest(cert, issuer, &ocsp.RequestOptions{
Hash: crypto.SHA256,
})
if err != nil {
return ocsp.Unknown, fmt.Errorf("creating ocsp request body: %w", err)
}
req, err := http.NewRequest(http.MethodPost, ocspURL, bytes.NewBuffer(buffer))
if err != nil {
return ocsp.Unknown, fmt.Errorf("creating http request: %w", err)
}
ocspUrl, err := url.Parse(ocspURL)
if err != nil {
return ocsp.Unknown, fmt.Errorf("parsing ocsp url: %w", err)
}
req.Header.Add("Content-Type", "application/ocsp-request")
req.Header.Add("Accept", "application/ocsp-response")
req.Header.Add("host", ocspUrl.Host)
req = req.WithContext(ctx)
// Make OCSP request
httpResponse, err := http.DefaultClient.Do(req)
if err != nil {
return ocsp.Unknown, fmt.Errorf("making ocsp request: %w", err)
}
defer httpResponse.Body.Close()
output, err := ioutil.ReadAll(httpResponse.Body)
if err != nil {
return ocsp.Unknown, fmt.Errorf("reading response body: %w", err)
}
// Parse response
ocspResponse, err := ocsp.ParseResponse(output, issuer)
if err != nil {
return ocsp.Unknown, fmt.Errorf("parsing ocsp response: %w", err)
}
return ocspResponse.Status, nil
}
func readFile(file string) []byte {
cert, err := ioutil.ReadFile(file)
if err != nil {
log.Fatalln(err)
}
return cert
}
func decodeCert(cert_bytes []byte) (*x509.Certificate, error) {
b, _ := pem.Decode(cert_bytes)
var cert *x509.Certificate
cert, err := x509.ParseCertificate(b.Bytes)
if err != nil {
fmt.Println("Parse Error")
return nil, fmt.Errorf("parsing certificate: %w", err)
}
return cert, err
}
func decodeCrl(cert_bytes []byte) (*pkix.CertificateList, error) {
b, _ := pem.Decode(cert_bytes)
crl, err := x509.ParseCRL(b.Bytes)
if err != nil {
return nil, fmt.Errorf("parsing crl: %w", err)
}
return crl, err
}
func getIssuer(issuerUrl string) ([]byte, error) {
var resp = &http.Response{}
values := map[string]string{"label": "intermediate"}
jsonValue, _ := json.Marshal(values)
resp, err := http.Post(issuerUrl, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
fmt.Println(err)
panic("Something wrong with the post request")
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
json.Unmarshal([]byte(body), &cfinfo)
return []byte(cfinfo.Result.Certificate), nil
}
func getCrl(crlUrl string) ([]byte, error) {
resp, err := http.Get(crlUrl)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("failed to retrieve CRL")
}
body, err := ioutil.ReadAll(resp.Body)
json.Unmarshal([]byte(body), &crlinfo)
crlString := "-----BEGIN X509 CRL-----\n" + crlinfo.Result + "\n-----END X509 CRL-----"
return []byte(crlString), err
}
|
Kafka
Strimzi
You can install strimzi kafka using the quick start guide: Strimzi Quickstarts
You'll need to overwrite the kafka cluster with your own definition: my-cluster.yaml
This file includes an authorization section:
Code Block | ||||
---|---|---|---|---|
| ||||
authorization:
type: opa
url: http://opa.default:8181/v1/data/policy/kafka/authz/allow
allowOnError: false
initialCacheCapacity: 1000
maximumCacheSize: 10000
expireAfterMs: 10 #60000
superUsers:
- CN=henri
- anwar
- CN=wesley
- CN=my-user |
This file includes an listeners section:
Code Block | ||||
---|---|---|---|---|
| ||||
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: true
type: internal
- name: plain3
port: 9097
tls: false
type: internal
authentication:
type: oauth
checkIssuer: false
checkAccessTokenType: true
accessTokenIsJwt: true
enableOauthBearer: true
introspectionEndpointUri: http://keycloak.default:8080/auth/realms/opa/protocol/openid-connect/token/introspect
clientId: opacli
clientSecret:
secretName: my-cluster-oauth
key: clientSecret
validIssuerUri: http://keycloak.default:8080/auth/realms/opa
clientAudience: account
customClaimCheck: "@.resource_access['opacli'].roles[0] == 'opa-client-role'" |
Where opa and keycloak are configured.
If you are publishing or subscribing using this listener you will need to provide a JWT with your request.
The token will be verified using keycloak along with the audience field and the resource access['opaclii'].roles field.
Kafka will then pass the authorizaton on to opa where other checks will take place.
Below is a sample opa policy you might use to check the kafka input:
Code Block | ||||
---|---|---|---|---|
| ||||
package policy.kafka.authz
default allow = false
allow = true {
allowedActions := ["DESCRIBE", "DESCRIBE_CONFIGS", "READ"]
input.action.operation == allowedActions[_]
input.requestContext.header.name.clientId == "kowl"
}
allow = true {
allowedActions := ["DESCRIBE", "DESCRIBE_CONFIGS"]
allowedTopics := ["__consumer_offsets","__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog",]
input.action.resourcePattern.name == allowedTopics[_]
}
allow = true {
allowedActions := ["DESCRIBE", "READ"]
input.action.operation == allowedActions[_]
allowedResourceTypes := ["GROUP"]
input.action.resourcePattern.resourceType == allowedResourceTypes[_]
input.requestContext.principal.name == "service-account-opacli"
}
allow = true {
input.action.operation == "CREATE"
input.action.resourcePattern.name == "kafka-cluster"
input.action.resourcePattern.resourceType == "CLUSTER"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "consumer-client"
}
allow = true {
allowedActions := ["DESCRIBE","READ"]
input.action.operation == allowedActions[_]
input.action.resourcePattern.name == "my-topic"
input.action.resourcePattern.resourceType == "TOPIC"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "consumer-client"
}
allow = true {
allowedActions := ["DESCRIBE","WRITE"]
input.action.operation == allowedActions[_]
input.action.resourcePattern.name == "my-topic"
input.action.resourcePattern.resourceType == "TOPIC"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "producer-client"
} |
The default value of the sub field (principal name) in keycloak is the user id, this can be changed to the user name using a property mapper:
kube-mgmt
By default the entire policy and data cache are defined by the opa bundle.
If you need to add data from other sources you need to define the bundle root directories in the .manifest file
e.g.
Code Block | ||||
---|---|---|---|---|
| ||||
{
"revision" : "1",
"roots": ["policy", "roles"]
} |
You can then add the kube-mgmt sidecar to your opa deployment and this will pull data from configmaps in the namespace speciiied
e.g.
Code Block | ||
---|---|---|
| ||
- name: kube-mgmt
image: openpolicyagent/kube-mgmt:latest
args:
- "--namespaces=opa" |
Code Block | ||||
---|---|---|---|---|
| ||||
kind: ConfigMap
apiVersion: v1
metadata:
name: hello-data
namespace: opa
labels:
openpolicyagent.org/data: opa
data:
x.json: |
{"a": [1,2,3,4]} |
Code Block | ||||
---|---|---|---|---|
| ||||
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
} |
You can also add extra data using curl e.g. curl -X PUT $host:31182/v1/data/jwt -d @token.json
You can view all the data available to opa using curl: curl <opa host>:<port>/v1/data
You should see something similar to the following output:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"decision_id":"067f7d2d-0e5d-4ca8-8cb0-6d38543c6b2f",
"result":{
"jwt":{ "access_token":"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
},
"opa":{
"hello-data":{
"x.json":{
"a":[
1,
2,
3,
4
]
}
}
},
"servers":{
"id":"s1",
"name":"app",
"ports":[
"p1",
"p2",
"p3"
],
"protocols":[
"https",
"ssh"
]
}
}
} |
You can get individual values like this: curl <opa host>:<port>/v1/data/jwt/access_token or refer to them in your code like this: data.opa["hello-data"]["x.json"].a[0]
Strimzi Bridge
You can communicate with kafka over http by adding strimzi bridge:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
name: strimzi-bridge
namespace: kafka
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
http:
port: 8080 |
See also:
Using Open Policy Agent with Strimzi and Apache Kafka
Secure Kafka with Keycloak: SASL OAuth Bearer
Enforce Custom Resource policies with Open Policy Agent Gatekeeper
How to integrate Kafka with Istio
Example with JWT payload passed from Envoy
Kafka authentication using OAuth 2.0
Using HTTP Bridge as a Kubernetes sidecar
Kafka SSL : Setup with self signed certificate
Kafka & Istio
Integration of Istio with Kafka is limited.
The Kafka Protocol is a TCP level protocol (Layer 4)
Kafka is a binary wrapper over TCP protocol.
Istio mainly supports HTTP/HTTPS on layer 7.
Higher level protocols such as HTTP have access to metadata so they can properly route requests, this is not the case with the Kafka protocol.
Service Mesh and Proxies: Examples for Kafka
Setting up Strimzi bridge with Istio Authorization and ACLs
The Kiali screenshot above represents both a producer and a consumer connecting to Kafka through the Strmzi bridge.
Access to the bridge is controlled using JWT which is checked using an Istio AuthorizationPolicy.
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: "security.istio.io/v1beta1"
kind: "AuthorizationPolicy"
metadata:
name: "kafka-bridge-policy"
namespace: kafka
spec:
selector:
matchLabels:
app.kubernetes.io/name: kafka-bridge
action: ALLOW
rules:
- from:
- source:
requestPrincipals: ["http://keycloak.default:8080/auth/realms/opa", "http://istio-ingressgateway.istio-system:80/auth/realms/opa"]
- to:
- operation:
methods: ["POST", "GET"]
paths: ["/topics", "/topics/*", "/consumers/*"]
when:
- key: request.auth.claims[clientRole]
values: ["opa-client-role"] |
By enabling tls on one of our bootstrap listeners user certificates are automatically created when we create a new KafkaUser
- name: tlsauth
type: internal
port: 9096
tls: true
authentication:
type: tls
Setting the authentication type to tls also ensure secure communication between Kafka and zookeeper.
We also set authorization to simple for the Kafka cluster, this means ACLs will be used.
kafka:
authorization:
type: simple
superUsers:
- CN=kowl
Any users you want to by-pass ACL checks should be listed under superUsers.
We can then setup our bridge user and include the ACL permissions as part of the KafkaUser definition:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: bridge
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# access to the topic
- resource:
type: topic
name: my-topic
operations:
- Create
- Describe
- Read
- Write
- AlterConfigs
host: "*"
# access to the group
- resource:
type: group
name: my-group
operations:
- Describe
- Read
host: "*"
# access to the cluster
- resource:
type: cluster
operations:
- Alter
- AlterConfigs
host: "*" |
Authorization between the bridge and Kafka is done using user certificates.
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
name: strimzi-bridge
namespace: kafka
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9096
http:
port: 8080
logging:
type: inline
loggers:
logger.bridge.level: "DEBUG"
logger.send.name: "http.openapi.operation.send"
logger.send.level: "DEBUG"
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: bridge
certificate: user.crt
key: user.key |
Records can then to be posted to a topic with a call like this:
Endpoint:
http://istio-ingressgateway.istio-system:80/topics/my-topic
Payload: {"records":[{"key":"key-366","value":"value-366"}]}
and read from a topic using the following syntax:
i) Create a consumer group
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group
Payload: {"name":"my-consumer","format":"json","auto.offset.reset":"earliest","enable.auto.commit":false}
ii) subscribe to a topic
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/subscription
Payload: {"topics":["my-topic"]}
iii) Read records from a topic
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/records
These calls will be redirected to the bridge using a gateway and a virtual service
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: strimzi-bridge-gateway
namespace: kafka
spec:
selector:
istio: ingressgateway # use Istio gateway implementation
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- "*"
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: strimzi-bridge-vs
namespace: kafka
spec:
hosts:
- "*"
gateways:
- strimzi-bridge-gateway
http:
- name: "strimzi-bridge-routes"
match:
- uri:
prefix: "/topics"
- uri:
prefix: "/consumers"
route:
- destination:
port:
number: 8080
host: strimzi-bridge-bridge-service.kafka.svc.cluster.local |
Note: To use Kowl with this setup you'll need to create a kowl user and configure kowl to use tls
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: kowl
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls |
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: v1
kind: ConfigMap
metadata:
name: kowl-config-cm
namespace: kafka
data:
config.yaml: |
kafka:
brokers:
- my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9093
tls:
enabled: true
caFilepath: /etc/strimzi/ca/ca.crt
certFilepath: /etc/strimzi/user-crt/crt/user.crt
keyFilepath: /etc/strimzi/user-key/key/user.key |
- ca.crt is obtained from the my-cluster-cluster-ca-cert secret, user.crt and user.key are obtained from the kowl secret.
Admission Controllers
An admission controller is a piece of code that intercepts requests to the Kubernetes API server prior to persistence of the object, but after the request is authenticated and authorized.
Admission Controllers Reference
Kyverno
Opa Gatekeeper
OPA Gatekeeper: Policy and Governance for Kubernetes