Table of Contents |
---|
JIRA Ticket
Jira | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
ISTIO
Istio is a service mesh which provides a dedicated infrastructure layer that you can add to your applications. It adds capabilities like observability, traffic management, and security, without adding them to your own code.
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: "security.istio.io/v1beta1" kind: "PeerAuthentication" metadata: name: "default" namespace: "istio-system" spec: mtls: mode: STRICT |
JWT
JSON Web Token (JWT) is an open standard (RFC 7519) that defines a compact and self-contained way for securely transmitting information between parties as a JSON object.
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: security.istio.io/v1beta1 kind: AuthorizationPolicy metadata: name: httpbin namespace: foo spec: selector: matchLabels: app: httpbin rules: - from: - source: requestPrincipals: ["*"] - to: - operation: paths: ["/healthz"] |
Use Case
KUBERNETES
K8S RBAC (Role Based Access Control) - supported by default in kubernetes.
K8S Service Account
- Kube use service account (sa) to validate api access
- SAs can be lined to a role via a binding
- A default sa, with no permissions (no bindings), is created in each namespace – pods uses this namespace unless otherwise specified
- Pods can be specified to run with a specific sa
- The helm manger needs a SA with cluster wide permission (to be able to list installed charts etc)
However, during installation the pod running the helm install/upgrade/delete should run with a sa with only namespace permission to ensure not other modification is made to kube objects outside the namespace
K8S RBAC - controlling access to kubernetes resources
- Role and and rolebinding objects defines who (sa, user or group) is allowed to do what in the kubernetes api
- Role object
- Sets permissions on resources in a specific namespace
- ClusterRole object
- Sets permission on non-namespaced resources or across namespaces
- RoleBinding object
- Binds one or more Role or a ClusterRole object(s) to a user, group or service account
- ClusterRoleBinding object
- Binds one or more ClusterRole object(s) to a user, group or service account
- No installation requried – RBAC enabled by default
Network Policies
- K8S supports Network Policy objects but a provider need to be install for the polcies to take effect, e.g. Calico
- Network policies can control ingress and/or egress traffic by selecting applicable pods - bacially controlling traffic between pods and/or network endpoints
- Several providers: Calico, Cillium etc
...
- Allow traffic from ns: kong (the gateway), nonrtric and onap
- Deny traffic from any other ns
KONG
Kong is Orchestration Microservice API Gateway. Kong provides a flexible abstraction layer that securely manages communication between clients and microservices via API. Also known as an API Gateway, API middleware or in some cases Service Mesh.
...
With client-side discovery, the client or API gateway making the request is responsible for identifying the location of the service instance and routing the request to it. The client begins by querying the service registry to identify the location of the available instances of the service and then determines which instance to use. See https://konghq.com/learning-center/microservices/service-discovery-in-a-microservices-architecture/
Kong datastore
Kong uses an external datastore to store its configuration such as registered APIs, Consumers and Plugins. Plugins themselves can store every bit of information they need to be persisted, for example rate-limiting data or Consumer credentials. See https://konghq.com/faqs/#:~:text=PostgreSQL%20is%20an%20established%20SQL%20database%20for%20use,Cassandra%20or%20PostgreSQL%2C%20Kong%20maintains%20its%20own%20cache.
Kong Demo
Demo of Kong gateway access control already available.
...
See also https://konghq.com/blog/jwt-kong-gateway
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: networkpolicy-nr namespace: nonrtric spec: podSelector: {} policyTypes: - Ingress ingress: - from: - namespaceSelector: matchLabels: kubernetes.io/metadata.name: kong - namespaceSelector: matchLabels: kubernetes.io/metadata.name: onap - namespaceSelector: matchLabels: kubernetes.io/metadata.name: nonrtric |
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: configuration.konghq.com/v1 kind: KongPlugin metadata: name: app-jwt-kp namespace: nonrtric plugin: jwt --- apiVersion: v1 kind: Secret metadata: name: pms-jwt-sec namespace: nonrtric type: Opaque stringData: kongCredType: jwt key: pms-issuer algorithm: RS256 rsa_public_key: | -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAwetu4+suoz6c7e1kQz7I Jmujci8zHpp4qh3nsmEL8e3QOKzMVsLuQPcF8lO1bBoChSA+KMNJ5rEixGWSxClp 9XroBSgrvjDsKtpPIlBQMnyOUYRSXWnIodmN+7wA72pTxo7JtAypPzRscSgi0OZt 9dtmv50RLr9Wph5cI+IE9OtgW58OKtdFRGigGHfdUEwrT/MPw2rOU85YRFaEgT/i wcuQCe+Zmf2S2gVgK62u51ZFFn2VycJT1LcOt9cdqrSXYZAPfVKnQ/EgYvDdzFL1 x73JkrrSEP3pfrN4bXOnc7cS/S9Y2qk/I+QCR6a6XKmqk5SnWJSyXvKdYQJrgxJp lQIDAQAB -----END PUBLIC KEY----- --- apiVersion: v1 kind: Secret metadata: name: ecs-jwt-sec namespace: nonrtric type: Opaque stringData: kongCredType: jwt key: ecs-issuer algorithm: RS256 rsa_public_key: | -----BEGIN PUBLIC KEY----- MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtminzTtNs5oqPCbg4uC1 L7MfR3B+uyYvkSKr3NFieRCxp6VhrgodJJXYc3SqXbaTVBkTwU24wG4UvJCnoRQd 0VhSawtLkN8XNAdCiD831dKUYMJPs43ZY/gO5CHVqUMdSHlp8dn7jNren59dvRRS 3xC1D3etXuEU01XGuLi/5qJLAKqDbYs3bH1vslTjndg1WTsrkU8GEIT1NphSYg25 s6rSLTIBfk8FjKquYHw3wYVSQK9rg2mqddJpRWkfZnazMHTmSNjOJpiNb77VLGSx 9qDbbLjurCl2mAG5Z+w76uKfKGgOo68SU0TL1sPybsKhAoZZg1gF06mvMln5eq5C RQIDAQAB -----END PUBLIC KEY----- --- apiVersion: configuration.konghq.com/v1 kind: KongPlugin metadata: name: pms-group-acl-kp namespace: nonrtric plugin: acl config: whitelist: ['pms-group'] --- apiVersion: v1 kind: Secret metadata: name: pms-group-acl-sec namespace: nonrtric type: Opaque stringData: kongCredType: acl group: pms-group --- apiVersion: configuration.konghq.com/v1 kind: KongPlugin metadata: name: ecs-group-acl-kp namespace: nonrtric plugin: acl config: whitelist: ['ecs-group'] --- apiVersion: v1 kind: Secret metadata: name: ecs-group-acl-sec namespace: nonrtric type: Opaque stringData: kongCredType: acl group: ecs-group --- apiVersion: configuration.konghq.com/v1 kind: KongPlugin metadata: name: all-group-acl-kp namespace: nonrtric plugin: acl config: whitelist: ['ecs-group', 'pms-group'] --- apiVersion: configuration.konghq.com/v1 kind: KongConsumer metadata: name: pms-user-kc namespace: nonrtric annotations: kubernetes.io/ingress.class: kong username: pms-user credentials: - pms-jwt-sec - pms-group-acl-sec --- apiVersion: configuration.konghq.com/v1 kind: KongConsumer metadata: name: ecs-user-kc namespace: nonrtric annotations: kubernetes.io/ingress.class: kong username: ecs-user credentials: - ecs-jwt-sec - ecs-group-acl-sec --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: r1-pms-ing namespace: nonrtric annotations: konghq.com/plugins: app-jwt-kp,pms-group-acl-kp konghq.com/strip-path: "false" spec: ingressClassName: kong rules: - http: paths: - path: /a1-policy pathType: ImplementationSpecific backend: service: name: policymanagementservice port: number: 8081 --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: r1-ecs-ing namespace: nonrtric annotations: konghq.com/plugins: app-jwt-kp,ecs-group-acl-kp konghq.com/strip-path: "false" spec: ingressClassName: kong rules: - http: paths: - path: /data-consumer pathType: ImplementationSpecific backend: service: name: enrichmentservice port: number: 8083 - path: /data-producer pathType: ImplementationSpecific backend: service: name: enrichmentservice port: number: 8083 --- apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: r1-echo-ing namespace: nonrtric annotations: konghq.com/plugins: app-jwt-kp,all-group-acl-kp konghq.com/strip-path: "true" spec: ingressClassName: kong rules: - http: paths: - path: /echo pathType: ImplementationSpecific backend: service: name: httpecho port: number: 80 |
ISTIO Demo
- Install ISTIO on minikube using instruction here: Istio Installation - Simplified Learning (waytoeasylearn.com)
cd to the istio directory and install the demo application
kubectl create ns foo
kubectl apply -f <(istioctl kube-inject -f samples/httpbin/httpbin.yaml) -n foo
Create a python script to generate a JWT token using the code from here: https://medium.com/intelligentmachines/istio-jwt-step-by-step-guide-for-micro-services-authentication-690b170348fc . Install python_jwt using pip if it's not already installed.
- Create jwt-example.yaml using the public key generated by the python script:
kubectl create -f jwt-example.yaml
Code Block language yml title jwt-example.yaml apiVersion: "security.istio.io/v1beta1" kind: "RequestAuthentication" metadata: name: "jwt-example" namespace: istio-system spec: selector: matchLabels: istio: ingressgateway jwtRules: - issuer: "ISSUER" jwks: | { "keys":[{"e":"AQAB","kty":"RSA","n":"x_yYl4uW5c6NHOA-bDDh0MThFggBWl-vYJr77b9F1LmAtTlJVM0rL5klTfv2DmlAmD9eZPrWeUOoOGhSpe58XiSAvxyeaOrZhtyUjT3aglrSys0YBsB19ItNGMuoIuzPpWOrdtKwHa9rPbrdc6q7vb93qu2UVaIz-3FJmGFtSA5t8FK_5bZKF-oOzRLwqeVQ3n0Bu_dFDuGeZjQWMZF32QupyA-GF-tDGGriPLy9sutlB1NQyZ4qiSZx5UMxcfLwsWfQxHemdwLeZXWKWNBov8RmbZy2Jz-dwg6XjHBWAjTnCGG9p-bp63nUlnELI3LcEGhGOugZBqcpNT5dEAQ0fQ"}]}
Export the JWT token generated by the python script as an environment variable:
export TOKEN="eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJBVURJRU5DRSIsImV4cCI6MTYzNzI1NDkxNSwiaWF0IjoxNjM3MjUxOTE1LCJpc3MiOiJJU1NVRVIiLCJqdGkiOiJCcmhDdEstcC00ZTF0RlBrZmpuSmhRIiwibmJmIjoxNjM3MjUxOTE1LCJwZXJtaXNzaW9uIjoicmVhZCIsInJvbGUiOiJ1c2VyIiwic3ViIjoiU1VCSkVDVCJ9.HrQCLPZXf0VkFe7JUVGXq-sHJQhVibqhToG4r63py-iwHWlUL02_WfoWRoxapgqGwImDdSlt1uG8RR-6VMqzWwGlcqBIRhFTG0nmzmtQjnOUs6QAKSUpA3PyWBIYHV0BwZbpo8Zq1Bo-sELy400fU-MCQ_054fSsG7JMBMmrnj8NyJmD2lNN0VSFGO53SPl2tQSVlc9OwAr8Uu0jfLPfUmh6yq43qFuxnVRfBGLLPNOt29aOfAetKLc72qlphtnbDx2a9teP5AIbkIWyIlhTytEnQRCwU4x8gDrEdkrHui4qCtzpl_uoITSwPe3AFsi7gQHB6rJoDj-j2zPc4rUTAA"export INGRESS_HOST=$(minikube ip)
export INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
Test the service:
curl --header "Authorization: Bearer $TOKEN" $INGRESS_HOST:$INGRESS_PORT/headers -s -o /dev/null -w "%{http_code}\n"You should get a response code of 200
Update the token to something invalid
The response will be 401
Istio Service JWT Test
istio-test.yaml (uses the default namespace)
...
See the latest version here: istio-test-latest.yaml
Istio with Keycloak
If you are using minikube on Ubuntu WSL you need to run "minikube service keycloak" to see keycloak ui.
...
Retrieve public key using : http(s)://<hostname>/auth/realms/<realm name>
Anchor keycloak keycloak
Enable keycloak with Istio
Setup a new realm, user and client as shown here : https://www.keycloak.org/getting-started/getting-started-kube
...
Note: The iss of the token will differ depending on how you retrieve it. If it's retrieved from within the cluster for URL will start with http://keycloak.default:8080/ otherwise it will be something like : http://192.168.49.2:31560/ (http://(minikube ip): (keycloak service nodePort))
Keycloak database
Keycloak uses the H2 database by default.
...
Code Block | ||||
---|---|---|---|---|
| ||||
spec: initContainers: - name: init-keycloak image: busybox command: ['sh', '-c', 'until nc -vz keycloak.default 8080; do echo waiting for keycloak; sleep 2; done;'] containers: - name: a1-policy image: hashicorp/http-echo ports: - containerPort: 5678 args: - -text - "Hello a1-policy" |
See also: keycloak.yaml
Keycloak Operator
Keycloak Operator Installation
Istio mTLS
Test: Istio / Mutual TLS Migration
To see mTLS in kiali go to the display menu and check the security check box.
Running curl --header "Authorization: Bearer $TOKEN" ai-policy from the httpbin pod.
The padlock icon indicates mTLS is being used for communication between the pods.
Policy to enforce mTLS between PODs in the istio-nonrtric namespace in STRICT mode:
...
Change namespace to istio-system to apply mTLS for the entire cluster.
Istio cert manager
https://istio.io/latest/docs/ops/integrations/certmanager/
Go Http Request Handler for Testing
Anchor | ||||
---|---|---|---|---|
|
nonrtric-server-go
Code Block | ||||
---|---|---|---|---|
| ||||
package main import ( "fmt" "log" "github.com/gorilla/mux" "net/http" "encoding/json" "io/ioutil" "strings" ) func requestHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") params := mux.Vars(r) var id = params["id"] var data = params["data"] var prefix = strings.Split(r.URL.Path, "/")[1] switch r.Method { case "GET": if id == "" { fmt.Println( "Received get request for "+ prefix +", params: nil\n") fmt.Fprintf(w, "Response to get request for "+ prefix +", params: nil\n") }else { fmt.Println("Received get request for "+ prefix +", params: id=" + id + "\n") fmt.Fprintf(w, "Response to get request for "+ prefix +", params: id=" + id + "\n") } case "POST": body, err := ioutil.ReadAll(r.Body) if err != nil { panic(err.Error()) } keyVal := make(map[string]string) json.Unmarshal(body, &keyVal) id := keyVal["id"] data := keyVal["data"] fmt.Println("Received post request for "+ prefix +", params: id=" + id +", data=" + data + "\n") fmt.Fprintf(w, "Response to post request for "+ prefix +", params: id=" + id +", data=" + data + "\n") case "PUT": fmt.Println("Received put request for "+ prefix +", params: id=" + id +", data=" + data + "\n") fmt.Fprintf(w, "Response to put request for "+ prefix +", params: id=" + id +", data=" + data + "\n") case "DELETE": fmt.Println("Received delete request for "+ prefix +", params: id=" + id + "\n") fmt.Fprintf(w, "Response to delete request for "+ prefix +", params: id=" + id + "\n") default: fmt.Println("Received request for unsupported method, only GET, POST, PUT and DELETE methods are supported.") fmt.Fprintf(w, "Error, only GET, POST, PUT and DELETE methods are supported.") } } func main() { router := mux.NewRouter() var prefixArray [3]string = [3]string{"/a1-policy", "/data-consumer", "/data-producer"} for _, prefix := range prefixArray { router.HandleFunc(prefix, requestHandler) router.HandleFunc(prefix+"/{id}", requestHandler) router.HandleFunc(prefix+"/{id}/{data}", requestHandler) } log.Fatal(http.ListenAndServe(":8080", router)) } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
FROM golang:1.15.2-alpine3.12 as build RUN apk add git RUN mkdir /build ADD . /build WORKDIR /build RUN go get github.com/gorilla/mux RUN go build -o nonrtric-server-go . FROM alpine:latest RUN mkdir /app WORKDIR /app/ # Copy the Pre-built binary file from the previous stage COPY --from=build /build . # Expose port 8080 EXPOSE 8080 # Run Executable CMD ["/app/nonrtric-server-go"] |
Testing
Update AuthorizationPolicy to only allow certain operations:
...
-----------------------------------------------------------------------
Number of Tests: 10, Tests Passed: 10, Tests Failed: 0
Date: 2021-12-06-14:48:33
-----------------------------------------------------------------------
Go Http Client for running inside cluster
Anchor | ||||
---|---|---|---|---|
|
nonrtric-client-go
Code Block | ||||
---|---|---|---|---|
| ||||
package main import ( "fmt" "net/http" "net/url" "encoding/json" "time" "io/ioutil" "math/rand" "strings" "bytes" "strconv" "flag" ) type Jwttoken struct { Access_token string Expires_in int Refresh_expires_in int Refresh_token string Token_type string Not_before_policy int Session_state string Scope string } var gatewayHost string var gatewayPort string var keycloakHost string var keycloakPort string var useGateway string var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") func randSeq(n int) string { b := make([]rune, n) for i := range b { b[i] = letters[rand.Intn(len(letters))] } return string(b) } func getToken(user string, password string, clientId string, realmName string) string { keycloakUrl := "http://"+keycloakHost+":"+keycloakPort+"/auth/realms/"+realmName+"/protocol/openid-connect/token" resp, err := http.PostForm(keycloakUrl, url.Values{"username": {user}, "password": {password}, "grant_type": {"password"}, "client_id": {clientId}}) if err != nil { fmt.Println(err) panic("Something wrong with the credentials or url ") } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) var jwt Jwttoken json.Unmarshal([]byte(body), &jwt) return jwt.Access_token; } func MakeRequest(client *http.Client, prefix string, method string, ch chan<-string) { var id = rand.Intn(1000) var data = randSeq(10) var service = strings.Split(prefix, "/")[1] var gatewayUrl = "http://"+gatewayHost+":"+gatewayPort var token = "" var jsonValue []byte = []byte{} var restUrl string = "" if strings.ToUpper(useGateway) != "Y" { gatewayUrl = "http://"+service+".istio-nonrtric:80" //fmt.Println(gatewayUrl) } if service == "a1-policy" { token = getToken("pmsuser", "secret","pmsclient", "pmsrealm") }else{ token = getToken("icsuser", "secret","icsclient", "icsrealm") } if method == "POST" { values := map[string]string{"id": strconv.Itoa(id), "data": data} jsonValue, _ = json.Marshal(values) restUrl = gatewayUrl+prefix } else if method == "PUT" { restUrl = gatewayUrl+prefix+"/"+strconv.Itoa(id)+"/"+data } else { restUrl = gatewayUrl+prefix+"/"+strconv.Itoa(id) } req, err := http.NewRequest(method, restUrl, bytes.NewBuffer(jsonValue)) if err != nil { fmt.Printf("Got error %s", err.Error()) } req.Header.Set("Content-type", "application/json") req.Header.Set("Authorization", "Bearer "+token) resp, err := client.Do(req) if err != nil { fmt.Printf("Got error %s", err.Error()) } defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) respString := string(body[:]) if respString == "RBAC: access denied"{ respString += " for "+service+" "+strings.ToLower(method)+" request\n" } ch <- fmt.Sprintf("%s", respString) } func main() { flag.StringVar(&gatewayHost, "gatewayHost", "192.168.49.2", "Gateway Host") flag.StringVar(&gatewayPort, "gatewayPort" , "32162", "Gateway Port") flag.StringVar(&keycloakHost, "keycloakHost", "192.168.49.2", "Keycloak Host") flag.StringVar(&keycloakPort, "keycloakPort" , "31560", "Keycloak Port") flag.StringVar(&useGateway, "useGateway" , "Y", "Connect to services hrough API gateway") flag.Parse() client := &http.Client{ Timeout: time.Second * 10, } ch := make(chan string) var prefixArray [3]string = [3]string{"/a1-policy", "/data-consumer", "/data-producer"} var methodArray [4]string = [4]string{"GET", "POST", "PUT", "DELETE"} for true { for _,prefix := range prefixArray{ for _,method := range methodArray{ go MakeRequest(client, prefix, method, ch) } } for i := 0; i < len(prefixArray); i++ { for j := 0; j < len(methodArray); j++ { fmt.Println(<-ch) } } time.Sleep(30 * time.Second) } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: "security.istio.io/v1beta1" kind: "AuthorizationPolicy" metadata: name: "ics-policy" namespace: istio-nonrtric spec: selector: matchLabels: apptype: nonrtric-ics action: ALLOW rules: - from: - source: namespaces: ["default"] to: - operation: methods: ["GET", "POST", "PUT", "DELETE"] paths: ["/data-*"] hosts: ["data-consumer*", "data-producer*"] ports: ["8080"] --- apiVersion: "security.istio.io/v1beta1" kind: "AuthorizationPolicy" metadata: name: "pms-policy" namespace: istio-nonrtric spec: selector: matchLabels: apptype: nonrtric-pms action: ALLOW rules: - from: - source: principals: ["cluster.local/ns/default/sa/goclient"] to: - operation: methods: ["GET", "POST", "PUT", "DELETE"] paths: ["/a1-policy*"] hosts: ["a1-policy*"] ports: ["8080"] when: - key: request.auth.claims[preferred_username] values: ["pmsuser"] |
Request are sent from the nonrtric-client-go pod to the services directly from within the cluster.
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: "security.istio.io/v1beta1" kind: "AuthorizationPolicy" metadata: name: "pms-policy" namespace: istio-nonrtric spec: selector: matchLabels: apptype: nonrtric-pms action: ALLOW rules: - from: - source: requestPrincipals: ["http://192.168.49.2:31560/auth/realms/pmsrealm/fab53fd0-3315-4e2f-bd17-6984fb7745f2"] - source: requestPrincipals: ["http://keycloak.default:8080/auth/realms/pmsrealm/fab53fd0-3315-4e2f-bd17-6984fb7745f2"] to: - operation: methods: ["GET", "POST", "PUT", "DELETE"] paths: ["/a1-policy*"] when: - key: request.auth.claims[role] values: ["pms_admin"] - from: - source: requestPrincipals: ["http://192.168.49.2:31560/auth/realms/pmsrealm/f96255ec-d553-4c2e-b106-0ed586ccab70"] - source: requestPrincipals: ["http://keycloak.default:8080/auth/realms/pmsrealm/f96255ec-d553-4c2e-b106-0ed586ccab70"] to: - operation: methods: ["GET"] paths: ["/a1-policy*"] when: - key: request.auth.claims[role] values: ["pms_viewer"] |
pms_admin role:
Test 1: Testing GET /a1-policy
Received get request for a1-policy, params: nil
...
Test 5: Testing POST /a1-policy
Received post request for a1-policy, params: id=1003, data=abc
pms_viewer role:
Test 1: Testing GET /a1-policy
Received get request for a1-policy, params: nil
...
Further details on authorization policies are avaiable here
Anchor | ||||
---|---|---|---|---|
|
Istio network policy is enforced at the pod level (in the Envoy proxy), in user-space, (layer 7), as opposed to Kubernetes network policy, which is in kernel-space (layer 4), and is enforced on the host. By operating at application layer, Istio has a richer set of attributes to express and enforce policy in the protocols it understands (e.g. HTTP headers).
Anchor grafana grafana
Grafana
Istio also comes with grafana, to start it run : istioctl dashboard grafana
...
The istio dashboards are installed by default
Select the Istio Service dashboard → service workloads to see the incoming requests
You can elasticsearch as a datasource to grafana.
...
This does not really work for a single shard instance like the one we are using.
See also: Grafana playground
Prometheus
Start the prometheus dashboard by running: istioctl dashboard prometheus
...
You can then create your own dashboard in grafana using these metrics: rapps-requests.json
OAuth2 Proxy
Welcome to OAuth2 Proxy | OAuth2 Proxy (oauth2-proxy.github.io)
Calico network policy
https://docs.projectcalico.org/security/calico-network-policy
...
Following the example in the link above I installed the test application in a separate namespace (calico-test). Using curl I was able to access the database prior to applying the GlobalNetworkPolicy. After applying the policy the request timed out rather than return a 403 forbidden message.
Logging
Anchor | ||||
---|---|---|---|---|
|
Elasticsearch
We can use elasticsearch, kibana and fluentd to aggregate and visualize the kubernetes logs.
...
This will produce a report like the following:
This shows the number of requests made to the nonrtric-server-go.
...
Here you can create different charts to display your data and then add them to a dashboard.
Here you can see 3 graphs, the first one shows the number of requests received by each NONRTRIC componet in he last 60 minutes.
...
Run GET /_cat/indices?v to see the list of indices currently in use
You can delete indices that are no longer required by running the following command:
...
Code Block | ||||
---|---|---|---|---|
| ||||
PUT /_ilm/policy/cleanup_policy { "policy": { "phases": { "hot": { "actions": {} }, "delete": { "min_age": "1d", "actions": { "delete": {} } } } } } PUT /logstash-*/_settings { "lifecycle.name": "cleanup_policy" } PUT /_template/logging_policy_template { "index_patterns": ["logstash-*"], "settings": { "index.lifecycle.name": "cleanup_policy" } } |
Elasticsearch SDK
Code Block | ||||
---|---|---|---|---|
| ||||
package main import ( "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/esapi" "github.com/elastic/go-elasticsearch/v8" "io/ioutil" "log" "os/exec" "strings" ) func main() { cmd := exec.Command("minikube", "ip") stdout, err := cmd.Output() ingressHost := strings.TrimSpace(string(stdout)) cmd = exec.Command("minikube", "ssh-key") stdout, err = cmd.Output() ingressKey := strings.TrimSpace(string(stdout)) // copy ca cert cmd = exec.Command("scp", "-i", ingressKey, "docker@"+ingressHost+":/var/elasticsearch/config/certs/ca/ca.crt", "/mnt/c/Users/ktimoney/go/elastic/") stdout, err = cmd.Output() // get the elasticsearch service nodePort cmd = exec.Command("kubectl", "get", "service", "elasticsearch", "-n", "logging", "-o", "jsonpath={.spec.ports[?(@.port==9200)].nodePort}") stdout, err = cmd.Output() secureIngressPort := strings.TrimSpace(string(stdout)) clusterURLs := []string{"https://" + ingressHost + ":" + secureIngressPort} username := "elastic" password := "secret" cert, _ := ioutil.ReadFile("./ca.crt") // client configuration cfg := elasticsearch.Config{ Addresses: clusterURLs, Username: username, Password: password, CACert: cert, } ctx := context.Background() es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("Error creating the client: %s", err) } log.Println(elasticsearch.Version) resp, err := es.Info() if err != nil { log.Fatalf("Error getting response: %s", err) } defer resp.Body.Close() log.Println(resp) // Index Query indexResp, err := esapi.CatIndicesRequest{Format: "json", Pretty: true}.Do(ctx, es) if err != nil { return } indexBody := &indexResp.Body defer indexResp.Body.Close() fmt.Println(indexResp.String()) body, err := ioutil.ReadAll(*indexBody) var results []map[string]interface{} json.Unmarshal(body, &results) fmt.Printf("Index: %+v\n", results) indexName := fmt.Sprintf("%v", results[len(results)-1]["index"]) query := `{"query": {"match" : {"log": "token"}},"size": 3}` runQuery(es, ctx, indexName, query) query = ` { "query": { "bool": { "must": [ { "match": { "kubernetes.container_name": "istio-proxy" }}, { "match": { "log": "token" }}, { "match": { "kubernetes.labels.app_kubernetes_io/name": "rapp-jwt-invoker" }}, { "range": { "@timestamp": { "gte": "now-60m" }}} ] } },"size": 1 } ` runQuery(es, ctx, indexName, query) query = ` { "query": { "bool": { "must": [ { "match": { "kubernetes.container_name": "istio-proxy" }}, { "match": { "log": "GET /rapp-jwt-provider" }}, { "match": { "kubernetes.labels.app_kubernetes_io/name": "rapp-jwt-provider" }}, { "match_phrase": { "tag": "jwt-provider" }}, { "range": { "@timestamp": { "gte": "now-60m" }}} ] } },"size": 1 } ` runQuery(es, ctx, indexName, query) } func runQuery(es *elasticsearch.Client, ctx context.Context, indexName string, query string) { // Query indexName var mapResp map[string]interface{} var buf bytes.Buffer var b strings.Builder b.WriteString(query) read := strings.NewReader(b.String()) // Attempt to encode the JSON query and look for errors if err := json.NewEncoder(&buf).Encode(read); err != nil { log.Fatalf("Error encoding query: %s", err) // Query is a valid JSON object } else { fmt.Println("\njson.NewEncoder encoded query:", read, "\n") } // Pass the JSON query to client's Search() method searchResp, err := es.Search( es.Search.WithContext(ctx), es.Search.WithIndex(indexName), es.Search.WithBody(read), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), ) if err != nil { log.Fatalf("Elasticsearch Search() API ERROR:", err) } defer searchResp.Body.Close() // Decode the JSON response and using a pointer if err := json.NewDecoder(searchResp.Body).Decode(&mapResp); err != nil { log.Fatalf("Error parsing the response body: %s", err) } // Iterate the document "hits" returned by API call for _, hit := range mapResp["hits"].(map[string]interface{})["hits"].([]interface{}) { // Parse the attributes/fields of the document doc := hit.(map[string]interface{}) // The "_source" data is another map interface nested inside of doc source := doc["_source"] // Get the document's _id and print it out along with _source data docID := doc["_id"] fmt.Println("docID:", docID) fmt.Println("_source:", source, "\n") // extract the @timestamp field timeStamp := fmt.Sprintf("%v", source.(map[string]interface{})["@timestamp"]) fmt.Println("timeStamp:", timeStamp) // extract the tag field tag := fmt.Sprintf("%v", source.(map[string]interface{})["tag"]) fmt.Println("tag:", tag) // extract the log field k8slog := fmt.Sprintf("%v", source.(map[string]interface{})["log"]) fmt.Println("log:", k8slog) } hits := int(mapResp["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)) fmt.Println("Matches:", hits) } |
...
You can run the same query in elasticsearch dev-tools:
Quick Installation Guide
- Download and install istio: istioctl install --set profile=demo
- cd to the samples/addons/ directory and install the dashboards e.g. kubectl create -f kiali.yaml
- Install postgres: istioctl kube-inject -f postgres.yaml | kubectl apply -f - (change the hostPath path value to a path on your host)
- Install keycloak: istioctl kube-inject -f keycloak.yaml | kubectl apply -f -
- Open the keycloak admin console and setup the required realms, users and clients
- Setup the "pms_admin" and "pms_viewer" roles for pmsuser and pmsuser2 respectively.
- Install Release G Release F: Coordinated Service Exposurenonrtric-server-go: docker build -t nonrtric-server-go:latest .
- Create the istio-nonrtric namespace: kubectl create namespace istio-nonrtric
- Enable istio for the istio-nonrtric namespace: kubectl label namespace istio-nonrtric istio-injection=enabled
- Edit the istio-test.yaml so the host ip specified matches yours.
- Also change the userid in the requestPrincipals field to match yours
- Install istio-test.yaml : kubectl create -f istio-test.yaml
- Install Release G Release F: Coordinated Service Exposure nonrtric-client-go: docker build -t nonrtric-client-go:latest .
- Install the test client: istioctl kube-inject -f client.yaml | kubectl apply -f -
- Open the kiali dashboard to check your services are up and running
- Open the grafana to view the istio dashboard
- Optionally install Release G Release F: Coordinated Service Exposure elasticsearch
ONAP
ONAP Next Generation Security & Logging Architecture
GO Client
Create kubernetes jobs in golang
...
There is also a helm sdk you can use:
HELM SDK
Code Block | ||||
---|---|---|---|---|
| ||||
package main import ( "fmt" "os" "log" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/client-go/rest" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/chart/loader" ) func main() { chartPath := "/tmp/wordpress-12.3.3.tgz" chart, err := loader.Load(chartPath) releaseName := "wordpress" releaseNamespace := "default" actionConfig, err := getActionConfig(releaseNamespace) if err != nil { panic(err) } listAction := action.NewList(actionConfig) releases, err := listAction.Run() if err != nil { log.Println(err) } for _, release := range releases { log.Println("Release: " + release.Name + " Status: " + release.Info.Status.String()) } iCli := action.NewInstall(actionConfig) iCli.Namespace = releaseNamespace iCli.ReleaseName = releaseName rel, err := iCli.Run(chart, nil) if err != nil { fmt.Println(err) } fmt.Println("Successfully installed release: ", rel.Name) } func getActionConfig(namespace string) (*action.Configuration, error) { actionConfig := new(action.Configuration) // Create the rest config instance with ServiceAccount values loaded in them config, err := rest.InClusterConfig() if err != nil { // fallback to kubeconfig home, exists := os.LookupEnv("HOME") if !exists { home = "/root" } kubeconfigPath := filepath.Join(home, ".kube", "config") if envvar := os.Getenv("KUBECONFIG"); len(envvar) >0 { kubeconfigPath = envvar } if err := actionConfig.Init(kube.GetConfig(kubeconfigPath, "", namespace), namespace, os.Getenv("HELM_DRIVER"), func(format string, v ...interface{}) { fmt.Sprintf(format, v) }); err != nil { panic(err) } } else { // Create the ConfigFlags struct instance with initialized values from ServiceAccount var kubeConfig *genericclioptions.ConfigFlags kubeConfig = genericclioptions.NewConfigFlags(false) kubeConfig.APIServer = &config.Host kubeConfig.BearerToken = &config.BearerToken kubeConfig.CAFile = &config.CAFile kubeConfig.Namespace = &namespace if err := actionConfig.Init(kubeConfig, namespace, os.Getenv("HELM_DRIVER"), func(format string, v ...interface{}) { fmt.Sprintf(format, v) }); err != nil { panic(err) } } return actionConfig, nil } |
...
The method getActionConfig works for both in-cluster deployments and from the localhost. It determines which one to use by calling the rest.InClusterConfig() function.
GO CLIENT SDK
Here are another couple of programs that demonstrate how to use the go client:
...
There is also support for istio in client go Istio client-go
ISTIO SDK
Code Block | ||||
---|---|---|---|---|
| ||||
package main import ( "context" "bytes" "fmt" "os" "log" "path/filepath" k8Yaml "k8s.io/apimachinery/pkg/util/yaml" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientcmd "k8s.io/client-go/tools/clientcmd" versioned "istio.io/client-go/pkg/clientset/versioned" v1beta1 "istio.io/client-go/pkg/apis/security/v1beta1" securityv1beta1 "istio.io/api/security/v1beta1" typev1beta1 "istio.io/api/type/v1beta1" ) const ( NAMESPACE = "default" ) const authorizationPolicyManifest = ` apiVersion: "security.istio.io/v1beta1" kind: "AuthorizationPolicy" metadata: name: "pms-policy" namespace: default spec: selector: matchLabels: apptype: nonrtric-pms action: ALLOW rules: - from: - source: principals: ["cluster.local/ns/default/sa/goclient"] to: - operation: methods: ["GET", "POST", "PUT", "DELETE"] paths: ["/a1-policy*"] hosts: ["a1-policy*"] ports: ["8080"] when: - key: request.auth.claims[role] values: ["pms_admin"] ` func connectToK8s() *versioned.Clientset { home, exists := os.LookupEnv("HOME") if !exists { home = "/root" } configPath := filepath.Join(home, ".kube", "config") config, err := clientcmd.BuildConfigFromFlags("", configPath) if err != nil { log.Fatalln("failed to create K8s config") } ic, err := versioned.NewForConfig(config) if err != nil { log.Fatalf("Failed to create istio client: %s", err) return ic } func createAuthorizationPolicy(clientset *versioned.Clientset) { authClient := clientset.SecurityV1beta1().AuthorizationPolicies(NAMESPACE) auth := &v1beta1.AuthorizationPolicy{} dec := k8Yaml.NewYAMLOrJSONDecoder(bytes.NewReader([]byte(authorizationPolicyManifest)), 1000) if err := dec.Decode(&auth); err != nil { fmt.Println(err) } result, err := authClient.Create(context.TODO(), auth, metav1.CreateOptions{}) if err!=nil { panic(err.Error()) } fmt.Printf("Create Authorization Policy %s \n", result.GetName()) } func createAuthorizationPolicy2(clientset *versioned.Clientset) { authClient := clientset.SecurityV1beta1().AuthorizationPolicies(NAMESPACE) auth := &v1beta1.AuthorizationPolicy{ ObjectMeta: metav1.ObjectMeta{ Name: "ics-policy", }, Spec: securityv1beta1.AuthorizationPolicy { Selector: &typev1beta1.WorkloadSelector{ MatchLabels: map[string]string{ "apptype" : "nonrtric-ics", }, }, Action: securityv1beta1.AuthorizationPolicy_ALLOW, Rules: []*securityv1beta1.Rule{{ From: []*securityv1beta1.Rule_From{{ Source: &securityv1beta1.Source{ Namespaces : []string{ "default", }, }, },}, To: []*securityv1beta1.Rule_To{{ Operation: &securityv1beta1.Operation{ Methods : []string{ "GET", "POST", "PUT", "DELETE", }, Paths : []string{ "/data-*", }, Hosts : []string{ "data-consumer*", "data-producer*", }, Ports : []string{ "8080", }, }, },}, },}, }, } result, err := authClient.Create(context.TODO(), auth, metav1.CreateOptions{}) if err!=nil { panic(err.Error()) } fmt.Printf("Create Authorization Policy %s \n", result.GetName()) } func main() { clientset := connectToK8s() createAuthorizationPolicy(clientset) createAuthorizationPolicy2(clientset) } |
...
keycloak aslo has a client called gocloak
GOCLOAK SDK
Code Block | ||||
---|---|---|---|---|
| ||||
package main import ( "github.com/Nerzal/gocloak/v10" "context" "fmt" ) func main(){ client := gocloak.NewClient("http://192.168.49.2:31560") ctx := context.Background() token, err := client.LoginAdmin(ctx, "admin", "admin", "master") if err != nil { fmt.Println(err) panic("Something wrong with the credentials or url") } realmRepresentation := gocloak.RealmRepresentation{ ID: gocloak.StringP("testRealm"), Realm: gocloak.StringP("testRealm"), DisplayName: gocloak.StringP("testRealm"), Enabled: gocloak.BoolP(true), } realm, err := client.CreateRealm(ctx, token.AccessToken, realmRepresentation) if err != nil { fmt.Println(err) panic("Oh no!, failed to create realm :(") } else { fmt.Println("Created new realm", realm) } newClient := gocloak.Client{ ClientID: gocloak.StringP("testClient"), Enabled: gocloak.BoolP(true), DirectAccessGrantsEnabled: gocloak.BoolP(true), BearerOnly: gocloak.BoolP(false), PublicClient: gocloak.BoolP(true), } clientId, err := client.CreateClient(ctx, token.AccessToken, realm, newClient) if err != nil { fmt.Println(err) panic("Oh no!, failed to create client :(") } else { fmt.Println("Created new client", clientId) } newUser := gocloak.User{ FirstName: gocloak.StringP("Bob"), LastName: gocloak.StringP("Uncle"), Email: gocloak.StringP("something@really.wrong"), Enabled: gocloak.BoolP(true), Username: gocloak.StringP("testUser"), } userId, err := client.CreateUser(ctx, token.AccessToken, realm, newUser) if err != nil { fmt.Println(err) panic("Oh no!, failed to create user :(") } else { fmt.Println("Created new user", userId) } err = client.SetPassword(ctx, token.AccessToken, userId, realm, "secret", false) if err != nil { fmt.Println(err) panic("Oh no!, failed to set password :(") } else { fmt.Println("Set password for user") } removeRoles := []gocloak.Role{} origRoles, err := client.GetRealmRoles(ctx, token.AccessToken, realm, gocloak.GetRoleParams{}) if err != nil { fmt.Println(err) panic("Oh no!, failed to retreive roles :(") } else { fmt.Println("Retrieved roles") } for _, r := range origRoles { removeRoles = append(removeRoles, *r) } newRole := gocloak.Role{ Name: gocloak.StringP("testRole"), } roleName, err := client.CreateRealmRole(ctx, token.AccessToken, realm, newRole) if err != nil { fmt.Println(err) panic("Oh no!, failed to create role :(") } else { fmt.Println("Created new role", roleName) } role, err := client.GetRealmRole(ctx, token.AccessToken, realm, roleName) if err != nil { fmt.Println(err) panic("Oh no!, failed to retrieve role :(") } else { fmt.Println("Retrieved role") } roles := []gocloak.Role{} roles = append(roles, *role) err = client.AddRealmRoleToUser(ctx, token.AccessToken, realm, userId, roles) if err != nil { fmt.Println(err) panic("Oh no!, failed to add role to user :(") } else { fmt.Println("Role added to user") } err = client.DeleteRealmRoleFromUser(ctx, token.AccessToken, realm, userId, removeRoles) if err != nil { fmt.Println(err) panic("Oh no!, failed to remove roles from user :(") } else { fmt.Println("Roles removed from user") } newMapper := gocloak.ProtocolMapperRepresentation{ ID: gocloak.StringP("testMapper"), Name: gocloak.StringP("testMapper"), Protocol: gocloak.StringP("openid-connect"), ProtocolMapper: gocloak.StringP("oidc-usermodel-realm-role-mapper"), Config: &map[string]string{ "access.token.claim": "true", "aggregate.attrs": "", "claim.name": "role", "id.token.claim": "true", "jsonType.label": "String", "multivalued": "", "userinfo.token.claim": "true", }, } _, err = client.CreateClientProtocolMapper(ctx, token.AccessToken, realm, clientId, newMapper) if err != nil { fmt.Println(err) panic("Oh no!, failed to add roleampper to client :(") } else { fmt.Println("Rolemapper added to client") } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
when: - key: request.auth.claims[clientRole] values: ["hwclientrole"] |
Keycloak Client Authenticator
Using X509 certificates
Create the server side certificates using the following script
...
Java example available here: X.509 Authentication in Spring Security
Istio CA Certs
To allow istio to work with keycloak you must add your certificate to the istio certs when you're installing.
...
Further instruction are available here: Custom CA Integration using Kubernetes CSR
Using istio-gateway to obtain JWT tokens.
You may want to avoid connecting directly to the keycloak server for security reasons.
...
Note: The file above also allows for http calls to keycloak through the gateway, the ISS in this case is: "http://istio-ingressgateway.istio-system:80/auth/realms/<realm>". In this case the jwksUri ahould be set to the default URI for in-cluster keycloak calls i.e. "http://keycloak.default:8080/auth/realms/<realm>/protocol/openid-connect/certs"
Client authentication with signed JWT
Another option for retrieving JWT tokens for confidentail clients is using client authentication with signed JWT.
...
Note: we can also call this using https with some small modifications.
Client authentication with signed JWT with client secret
This is similar to the option above except we sign with the client secret instead of the private key.
...
Code Block | ||||
---|---|---|---|---|
| ||||
secret := "NKTh1bfR9HNwllMhdWrDMKhVJHTvwreC" token, err := jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString([]byte(secret)) if err != nil { return "", fmt.Errorf("create: sign token: %w", err) } |
Client keys tab
The client offer many different ways of configuring the authenticatio keys.
...
Code Block | ||||
---|---|---|---|---|
| ||||
claims["iss"] = "jwtclient3" claims["aud"] = "https://192.168.49.2:31561/auth/realms/x509" token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims) token.Header["kid"] = "AKAwbsKtqu9OmIwIsPOUf5zTJkIC73hzY9Myv4srjTs" tokenString, err := token.SignedString(key) if err != nil { return "", fmt.Errorf("create: sign token: %w", err) } return tokenString, nil } |
Keycloak Authorization code grant
The OAuth Authorization Code Grant flow is recommended if your application support redirects.
e.g. your application is a Web application or a mobile application.
...
Login will trigger a call to the /callback endpoint whcih in turn gets the JWT token and returns it to the user.
PKCE
PKCE stands for Proof Key for Code Exchange and the PKCE-enhanced Authorization Code Flow builds upon the standard Authorization Code Flow and it provides an additional level of security for OAuth clients.
...
curl -H "Authorization: Bearer $TOKEN" localhost:9000
Hello World OAuth2!
See also: golang oauth2
Keycloak Rest API
Documentation for the keycloak Rest API is available here: Keycloak Admin REST API
...
Code Block | ||||
---|---|---|---|---|
| ||||
export ADMIN_TKN=$(curl -s -X POST --insecure https://$HOST:$KEYCLOAK_PORT/auth/realms/master/protocol/openid-connect/token \ -H "Content-Type: application/x-www-form-urlencoded" \ -d "username=admin" \ -d 'password=admin' \ -d 'grant_type=password' \ -d 'client_id=admin-cli' | jq -r '.access_token') echo "ADMIN CLIENT TOKEN = $ADMIN_TKN" curl -X POST --insecure https://$HOST:$KEYCLOAK_PORT/auth/admin/realms/x509provider/clients \ -H "authorization: Bearer $ADMIN_TKN" \ -H "Content-Type: application/json" \ --data \ ' { "id": "x509Client", "name": "x509Client", "enabled": "true", "defaultClientScopes": ["email"], "redirectUris": ["*"], "attributes": {"use.refresh.tokens": "true", "client_credentials.use_refresh_token": "true"} } ' |
Keycloak SSO & User management
Identity Providers
You can use keycloak for single-sign so users don't have to login to each application individually.
...
See the "Identity Providers" menu in the keycloak UI.
...
User Federation
You can also use existing LDAP, active directory servers or relational databases for user management if this is required.
...
See the "User Federation" menu in the keycloak UI.
Keycloak Authorization services
...
Sample External Authorization Server with Istio
Working example
- To use keycloak authoriztion services start by creating a confidential client.
- Set "Authoriztion Enabled" to on.
- Create 2 roles rapp_admin and rapp_user and assigned them to the service account.
- Click on the authoriztion tab for the client to setup your authoriztion policies.
- Start by creating 4 scopes (create,edit, delete and view) in the "Authoriztion scopes" section
- Next create a resource "Rapp resource", set the URI to /api/resources/* and set the scopes to create,edit, delete and view.
- Next create a policy "View Policy", select the "rapp_user" role and set required to on.
- Create an "Admin policy", select the "rapp_admin" role and set required to on.
- In the permission section, create a "Scope Based" permission - for resouce choose the "Rapp resource" created earlier, scope should be set to view and select the "View Policy" for policy.
- Create another "Scope Based" permission - for resouce choose the "Rapp resource" created earlier, scope should be set to create, edit and delete and select the "Admin Policy" for policy.
...
PUT request using service account requesting party token
PUT resources 1
OPA
The Open Policy Agent (OPA) is an open source, general-purpose policy engine that unifies policy enforcement across the stack. OPA provides a high-level declarative language that lets you specify policy as code and simple APIs to offload policy decision-making from your software.
...
Dynamic Policy Composition for OPA
Open Policy Agent: Authorization in a Cloud Native World
Microservices Authorization using Open Policy Agent and Traefik (API Gateway)
A Study in Serverless Authorization with Open Policy Agent
Godaddy Opa Lambda Extension Plugin
GO
Go provides a library for opa.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/open-policy-agent/opa/rego"
"io/ioutil"
"net/http"
"net/url"
"os"
)
type Jwttoken struct {
Access_token string
Expires_in int
Refresh_expires_in int
Refresh_token string
Token_type string
Not_before_policy int
Session_state string
Scope string
}
var token Jwttoken
var opaPolicy string = `
package authz
import future.keywords.in
default allow = false
jwks := jwks_request("http://keycloak:8080/auth/realms/opa/protocol/openid-connect/certs").body
filtered_jwks := [ key |
some key in jwks.keys
key.use == "sig"
]
token_cert := json.marshal({"keys": filtered_jwks})
token = { "isValid": isValid, "header": header, "payload": payload } {
[isValid, header, payload] := io.jwt.decode_verify(input, { "cert": token_cert, "aud": "account", "iss": "http://keycloak:808
0/auth/realms/opa"})
}
allow {
is_token_valid
}
is_token_valid {
token.isValid
now := time.now_ns() / 1000000000
token.payload.iat <= now
now < token.payload.exp
token.payload.clientRole == "[opa-client-role]"
}
jwks_request(url) = http.send({
"url": url,
"method": "GET",
"force_cache": true,
"force_json_decode": true,
"force_cache_duration_seconds": 3600 # Cache response for an hour
})
`
func getToken() string {
clientSecret := "63wkv0RUXkp01pbqtNTSwghhTxeMW55I"
clientId := "opacli"
realmName := "opa"
keycloakHost := "keycloak"
keycloakPort := "8080"
keycloakUrl := "http://" + keycloakHost + ":" + keycloakPort + "/auth/realms/" + realmName + "/protocol/openid-connect/token" resp, err := http.PostForm(keycloakUrl,
url.Values{"client_secret": {clientSecret}, "grant_type": {"client_credentials"}, "client_id": {clientId}})
if err != nil {
fmt.Println(err)
panic("Something wrong with the credentials or url ")
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
json.Unmarshal([]byte(body), &token)
return token.Access_token
}
func traceOpa(input string) {
ctx := context.TODO()
test := rego.New(
rego.Query("x = data.authz.allow"),
rego.Trace(true),
rego.Module("example.rego", opaPolicy),
rego.Input(input),
)
test.Eval(ctx)
rego.PrintTraceWithLocation(os.Stdout, test)
}
func evaluateOpa(input string) {
ctx := context.TODO()
query, err := rego.New(
rego.Query("x = data.authz.allow"),
rego.Module("example.rego", opaPolicy),
).PrepareForEval(ctx)
if err != nil {
// Handle error.
fmt.Println(err.Error())
}
results, err := query.Eval(ctx, rego.EvalInput(input))
// Inspect results.
if err != nil {
// Handle evaluation error.
fmt.Println("Error: " + err.Error())
} else if len(results) == 0 {
// Handle undefined result.
fmt.Println("Results are empty")
} else {
// Handle result/decision.
fmt.Printf("Results = %+v\n", results) //=> [{Expressions:[true] Bindings:map[x:true]}]
}
}
func main() {
tokenStr := getToken()
traceOpa(tokenStr)
evaluateOpa(tokenStr)
}
|
OPA bundles and dynamic composition
Method 1
We can combined OPA bundles with dynamic composition to provide different policies for differect services.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package policies.rappopaprovider.policy
import input.attributes.request.http as http_request
import future.keywords.in
realm_name := "opa"
realm_url := sprintf("http://keycloak:8080/auth/realms/%v", [realm_name])
certs_url := sprintf("%v/protocol/openid-connect/certs", [realm_url])
jwks := jwks_request(certs_url).body
filtered_jwks := [ key |
some key in jwks.keys
key.use == "sig"
]
token_cert := json.marshal({"keys": filtered_jwks})
token = { "isValid": isValid, "header": header, "payload": payload } {
[_, encoded] := split(http_request.headers.authorization, " ")
[isValid, header, payload] := io.jwt.decode_verify(encoded, { "cert": token_cert, "aud": "account", "iss": realm_url})
}
deny[msg] {
not is_token_valid
msg = "denied by rappopaprovider.policy: not a valid token"
}
is_token_valid {
token.isValid
now := time.now_ns() / 1000000000
token.payload.iat <= now
now < token.payload.exp
token.payload.clientRole == "[opa-client-role]"
}
jwks_request(url) = http.send({
"url": url,
"method": "GET",
"force_cache": true,
"force_json_decode": true,
"force_cache_duration_seconds": 3600 # Cache response for an hour
}) |
...
Code Block | ||||
---|---|---|---|---|
| ||||
#!/bin/bash
INGRESS_HOST=$(minikube ip)
INGRESS_PORT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.spec.ports[?(@.name=="http2")].nodePort}')
TESTS=0
PASSED=0
FAILED=0
TEST_TS=$(date +%F-%T)
TOKEN=""
ACCESS_TOKEN=""
REFRESH_TOKEN=""
function get_token
{
local prefix="${1}"
url="http://${KEYCLOAK_HOST}:${KEYCLOAK_PORT}/auth/realms"
TOKEN=$(curl -s -X POST $url/opa/protocol/openid-connect/token -H \
"Content-Type: application/x-www-form-urlencoded" -d client_secret=63wkv0RUXkp01pbqtNTSwghhTxeMW55I \
-d 'grant_type=client_credentials' -d client_id=opacli)
ACCESS_TOKEN=$(echo $TOKEN | jq -r '.access_token')
}
function run_test
{
local prefix="${1}" type=${2} msg="${3}" data=${4}
TESTS=$((TESTS+1))
echo "Test ${TESTS}: Testing $type /${prefix}"
get_token $prefix
url=$INGRESS_HOST:$INGRESS_PORT"/"$prefix
result=$(curl -s -X ${type} -H "Content-type: application/json" -H "Authorization: Bearer $ACCESS_TOKEN" $url)
echo $result
if [ "$result" != "$msg" ]; then
echo "FAIL"
FAILED=$((FAILED+1))
else
echo "PASS"
PASSED=$((PASSED+1))
fi
echo ""
}
run_test "rapp-opa-provider" "GET" "Hello OPA World!" ""
echo
echo "-----------------------------------------------------------------------"
echo "Number of Tests: $TESTS, Tests Passed: $PASSED, Tests Failed: $FAILED"
echo "Date: $TEST_TS"
echo "-----------------------------------------------------------------------" |
Method 2
We can also organize the policies in the following way.
...
Code Block | ||||
---|---|---|---|---|
| ||||
package policy.services.rappopaprovider.ingress import data.policy.common.request allow = true { request.token.isValid request.method == "GET" request.path == [ "rapp-opa-provider" ] now := time.now_ns() / 1000000000 request.iat <= now now < request.exp request.clientRole == "[opa-client-role]" } |
Lastly create the parent rules file that will call the appropiates policy based on the http request path
...
token = { "isValid": isValid, "payload": payload } {
authorization_header := input.attributes.request.http.headers.authorization
encoded_token := trim_prefix(authorization_header, "Bearer ")
payload := io.jwt.decode(encoded_token)[1]
isValid := true
}
OPA with prometheus and grafana
Add the following job to your prometheus.yaml file in the scrape_configs section:
...
You should see a dashboard similar to the following:
OPA Profiling and Bench Marking
Below is a sample file we can use for profiling/benchmarking
...
opa bench --data rbactest.rego 'data.rbactest.allow'
+-------------------------------------------------+------------+
| samples | 22605 |
| ns/op | 47760 |
| B/op | 6269 |
| allocs/op | 112 |
| histogram_timer_rego_external_resolve_ns_75% | 400 |
| histogram_timer_rego_external_resolve_ns_90% | 500 |
| histogram_timer_rego_external_resolve_ns_95% | 500 |
| histogram_timer_rego_external_resolve_ns_99% | 871 |
| histogram_timer_rego_external_resolve_ns_99.9% | 29394 |
| histogram_timer_rego_external_resolve_ns_99.99% | 29800 |
| histogram_timer_rego_external_resolve_ns_count | 22605 |
| histogram_timer_rego_external_resolve_ns_max | 29800 |
| histogram_timer_rego_external_resolve_ns_mean | 434 |
| histogram_timer_rego_external_resolve_ns_median | 400 |
| histogram_timer_rego_external_resolve_ns_min | 200 |
| histogram_timer_rego_external_resolve_ns_stddev | 1045 |
| histogram_timer_rego_query_eval_ns_75% | 31100 |
| histogram_timer_rego_query_eval_ns_90% | 37210 |
| histogram_timer_rego_query_eval_ns_95% | 47160 |
| histogram_timer_rego_query_eval_ns_99% | 91606 |
| histogram_timer_rego_query_eval_ns_99.9% | 630561 |
| histogram_timer_rego_query_eval_ns_99.99% | 631300 |
| histogram_timer_rego_query_eval_ns_count | 22605 |
| histogram_timer_rego_query_eval_ns_max | 631300 |
| histogram_timer_rego_query_eval_ns_mean | 29182 |
| histogram_timer_rego_query_eval_ns_median | 25300 |
| histogram_timer_rego_query_eval_ns_min | 15200 |
| histogram_timer_rego_query_eval_ns_stddev | 32411 |
+-------------------------------------------------+------------+
OPA & Minio
OPA & MinIO's Access Management Plugin
OPA Sidecar injection
First create a namespace for your apps and enable istio and opa
...
Note: References to keycloak need to be updated to include the keycloak schema i.e keycloak.default
Basic Authentication
We can add basic authentication to our NGINX bubdle server by following these steps:
...
Your bundle is now protected with basic authentication.
JWT Injection
To enable automatic include a jwt token in our rapp request we need to enable some k8s objects:
1) MutatingWebhookConfiguration to inject jwt retrieval pod as a sidecar. The MutatingWebhookConfiguration uses a pod mutating serive to alter our pod and add the new sidecar.
Code Block | ||||
---|---|---|---|---|
| ||||
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"k8s.io/api/admission/v1beta1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"log"
"net/http"
"strconv"
)
type ServerParameters struct {
port int // webhook server port
certFile string // path to the x509 cert
keyFile string // path to the x509 private key
}
type patchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}
var parameters ServerParameters
var (
universalDeserializer = serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()
)
func main() {
flag.IntVar(¶meters.port, "port", 8443, "Webhook server port.")
flag.StringVar(¶meters.certFile, "tlsCertFile", "/certs/tls.crt", "File containing the x509 certificate")
flag.StringVar(¶meters.keyFile, "tlsKeyFile", "/certs/tls.key", "File containing the x509 private key")
flag.Parse()
http.HandleFunc("/inject-sidecar", HandleSideCarInjection)
log.Fatal(http.ListenAndServeTLS(":"+strconv.Itoa(parameters.port), parameters.certFile, parameters.keyFile, nil))
}
func HandleSideCarInjection(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
err = ioutil.WriteFile("/tmp/request", body, 0644)
if err != nil {
panic(err.Error())
}
var admissionReviewReq v1beta1.AdmissionReview
if _, _, err := universalDeserializer.Decode(body, nil, &admissionReviewReq); err != nil {
w.WriteHeader(http.StatusBadRequest)
fmt.Errorf("Could not deserialize request: %v", err)
} else if admissionReviewReq.Request == nil {
w.WriteHeader(http.StatusBadRequest)
errors.New("Malformed admission review - request is empty")
}
fmt.Printf("Received Admission Review Request - Type: %v \t Event: %v \t Name: %v \n",
admissionReviewReq.Request.Kind,
admissionReviewReq.Request.Operation,
admissionReviewReq.Request.Name,
)
var pod v1.Pod
err = json.Unmarshal(admissionReviewReq.Request.Object.Raw, &pod)
if err != nil {
fmt.Errorf("Could not unmarshal pod from admission request: %v", err)
}
var patches []patchOperation
labels := pod.ObjectMeta.Labels
labels["sidecar-injection-webhook"] = "jwt-proxy"
patches = append(patches, patchOperation{
Op: "add",
Path: "/metadata/labels",
Value: labels,
})
var containers []v1.Container
containers = append(containers, pod.Spec.Containers...)
container := v1.Container{
Name: "jwt-proxy",
Image: "ktimoney/rapps-jwt",
ImagePullPolicy: v1.PullIfNotPresent,
Ports: []v1.ContainerPort{
{
Name: "http",
Protocol: v1.ProtocolTCP,
ContainerPort: 8888,
},
},
VolumeMounts: []v1.VolumeMount{
{
Name: "certsdir",
MountPath: "/certs",
ReadOnly: true,
},
},
}
containers = append(containers, container)
fmt.Println(containers)
patches = append(patches, patchOperation{
Op: "add",
Path: "/spec/containers",
Value: containers,
})
pathType := v1.HostPathDirectoryOrCreate
pathTypePtr := &pathType
var volumes []v1.Volume
volumes = append(volumes, pod.Spec.Volumes...)
volume := v1.Volume{
Name: "certsdir",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/var/rapps/certs",
Type: pathTypePtr,
},
},
}
volumes = append(volumes, volume)
fmt.Println(volumes)
patches = append(patches, patchOperation{
Op: "add",
Path: "/spec/volumes",
Value: volumes,
})
fmt.Println(patches)
patchBytes, err := json.Marshal(patches)
if err != nil {
fmt.Errorf("Error occurred when trying to marshal JSON patch: %v", err)
}
admissionReviewResponse := v1beta1.AdmissionReview{
Response: &v1beta1.AdmissionResponse{
UID: admissionReviewReq.Request.UID,
Allowed: true,
},
}
admissionReviewResponse.Response.Patch = patchBytes
bytes, err := json.Marshal(&admissionReviewResponse)
if err != nil {
fmt.Errorf("Error occurred when trying to marshal Aadmission Review response: %v", err)
}
w.Write(bytes)
} |
MutatingWebhookConfiguration.yaml
Note: You'll need to configure your deployment to include a tls.crt and tls.key secret. Your MutatingWebhookConfiguration will need to include the corresponding ca bubdle.
2) Envoyfilter to update request header with jwt token.
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
name: RAPP-NAME-outbound-filter
namespace: RAPP-NS
spec:
workloadSelector:
labels:
app.kubernetes.io/name: RAPP-NAME
configPatches:
# The first patch adds the lua filter to the listener/http connection manager
- applyTo: HTTP_FILTER
match:
context: SIDECAR_OUTBOUND
listener:
filterChain:
filter:
name: "envoy.filters.network.http_connection_manager"
subFilter:
name: "envoy.filters.http.router"
patch:
operation: INSERT_BEFORE
value: # lua filter specification
name: envoy.lua
typed_config:
"@type": "type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua"
inlineCode: |
function envoy_on_request(request_handle)
local uri = request_handle:headers():get(":path")
local method = request_handle:headers():get(":method")
if (method ~= "POST" and path ~= "/auth/realms/REALM-NAME/protocol/openid-connect/token")
then
-- Make an HTTP call to an upstream host with the following headers, body, and timeout.
local headers, body = request_handle:httpCall(
"jwt_cluster",
{
[":method"] = "GET",
[":path"] = "/token",
[":authority"] = "jwt-proxy",
["realm"] = "REALM-NAME",
["client"] = "CLIENT-NAME"
},
"jwt call",
5000)
if (headers["authorization"] ~= nil)
then
request_handle:headers():add("authorization", headers["authorization"])
end
end
end
- applyTo: CLUSTER
match:
context: SIDECAR_OUTBOUND
patch:
operation: ADD
value: # cluster specification
name: jwt_cluster
type: STRICT_DNS
connect_timeout: 60s
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: jwt_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 0.0.0.0
port_value: 8888 |
CFSSL
CFSSL is CloudFlare's PKI/TLS tool. It is both a command line tool and an HTTP API server for signing, verifying, and bundling TLS certificates.
To run this you first need to create an image with cfssl installed:
Code Block | ||||
---|---|---|---|---|
| ||||
FROM debian:latest
RUN apt-get update && apt-get install -y curl && \
curl -L https://github.com/cloudflare/cfssl/releases/download/v1.5.0/cfssl_1.5.0_linux_amd64 -o /usr/local/bin/cfssl && \
curl -L https://github.com/cloudflare/cfssl/releases/download/v1.5.0/cfssljson_1.5.0_linux_amd64 -o /usr/local/bin/cfssljson && \
chmod +x /usr/local/bin/cfssl && \
chmod +x /usr/local/bin/cfssljson
RUN mkdir /config
RUN mkdir /config
RUN mkdir /certs
WORKDIR /certs
EXPOSE 8888
EXPOSE 8889
ENTRYPOINT ["cfssl version"] |
This will install cfssl on a debian image.
You can then use this image to create a cfssl service in your k8s cluster.
kubectl create -f rapps-cfssl.yaml
Note: If you want to use this with a postgres db you'll need to setup a new database and username/password and then create the tables.
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT 'CREATE DATABASE cfssl'
WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = 'cfssl')\gexec
DO $$
BEGIN
IF NOT EXISTS (SELECT FROM pg_user WHERE usename = 'cfssl') THEN
CREATE USER cfssl WITH PASSWORD 'cfssl';
GRANT ALL PRIVILEGES ON DATABASE cfssl TO cfssl;
END IF;
END
$$; |
Login as the cfssl user then create the tables:
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE TABLE IF NOT EXISTS certificates (
serial_number bytea NOT NULL,
authority_key_identifier bytea NOT NULL,
ca_label bytea,
status bytea NOT NULL,
reason int,
expiry timestamptz,
revoked_at timestamptz,
pem bytea NOT NULL,
issued_at timestamptz,
not_before timestamptz,
metadata jsonb,
sans jsonb,
common_name TEXT,
PRIMARY KEY(serial_number, authority_key_identifier)
);
CREATE TABLE IF NOT EXISTS ocsp_responses (
serial_number bytea NOT NULL,
authority_key_identifier bytea NOT NULL,
body bytea NOT NULL,
expiry timestamptz,
PRIMARY KEY(serial_number, authority_key_identifier),
FOREIGN KEY(serial_number, authority_key_identifier) REFERENCES certificates(serial_number, authority_key_identifier)
); |
Once the pod is up and running you can connect to it by using port forwarding:
kubectl port-forward service/rapps-cfssl 8888:8888
You can generate signed certificates using a post request like the following:
curl -s -X POST -H "Content-Type: application/json" -d @./rapp-helloworld-provider-server.json http://127.0.0.1:8888/api/v1/cfssl/newcert
The rapp-helloworld-provider-server.json looks like this:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"request":{
"hosts":[
"rapp-helloworld-provider"
],
"names":[
{
"C":"IE",
"ST":"Ireland",
"L":"Dublin",
"O":"EST Rapp Provider",
"OU":"EST Rapp Provider hosts"
}
],
"CN":"rapp-helloworld-provider",
"key":{
"algo":"rsa",
"size":2048
}
},
"profile":"server"
} |
To parse the response contents you can use the following method:
NEWCERT=$(curl -s -X POST -H "Content-Type: application/json" -d @./rapp-helloworld-provider-server.json http://127.0.0.1:8888/api/v1/cfssl/newcert)
echo $NEWCERT | jq -r .result.certificate
echo $NEWCERT | jq -r .result.private_key
OCSP & CRL
OCSP and CRL are ways of checking a certificates validity.
The cfssl crl endpoint run on port 8888: 127.0.0.1:8888/api/v1/cfssl/crl
For ocsp to run you need to run the following commands in your container:
cfssl ocsprefresh -db-config /config/db-pg.json -ca /certs/ca-server.pem -responder /certs/server-ocsp.pem -responder-key /certs/server-ocsp-key.pem
cfssl ocspdump -db-config /config/db-pg.json > ocspdump.txt
cfssl ocspserve -port=8889 -responses=/config/ocspdump.txt -loglevel=0
These commands will need to be re-run every time a new revoke request is received.
To revoke a certificate use:
curl -d '{ "serial": "708853190752997406197199597002842275021840632879", "authority_key_id": "dd2cdb5b5b8abbe2fba81fd6c04393938f802b03", "reason": "superseded" }' http://localhost:8888/api/v1/cfssl/revoke
You can obtain the serial and authority_key_id from your database using the following SQL:
select encode(authority_key_identifier,'escape') a_key, encode(serial_number,'escape') serial, encode(status,'escape') status from certificates;
You can also obtain the serial and authority_key_id from the certinfo endpoint
- Convert your certificate into a variable: pem=$(cat my.crt | sed -z 's/\n/\\n/g')
- curl -d '{"certificate": "'"$pem"'"}' http://localhost:8888/api/v1/cfssl/certinfo
- The serial number will come back as is but the authority_key_id needs to be converted: echo "22:8B:FA:ED:8B:FF:66:E7:05:A3:08:3A:41:33:D8:01:20:CA:CC:F4"| tr '[:upper:]' '[:lower:]' | sed s'/://'g =228bfaed8bff66e705a3083a4133d80120caccf4
You can check your certificate against ocsp and crl using the following code:
Code Block | ||||
---|---|---|---|---|
| ||||
package main
import (
"bytes"
"context"
"crypto"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"encoding/pem"
"flag"
"fmt"
"golang.org/x/crypto/ocsp"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"time"
)
type Cfinfo struct {
Success string `json:"success,omitempty"`
Result struct {
Certificate string `json:"certificate"`
Usages []string `json:"usages"`
Expiry string `json:"expiry"`
}
Errors []string `json:"errors"`
Messages []string `json:"messages"`
}
var cfinfo Cfinfo
type Crlinfo struct {
Success bool `json:"success,omitempty"`
Result string `json:"result,omitempty"`
Errors []string `json:"errors"`
Messages []string `json:"messages"`
}
var crlinfo Crlinfo
type ServerParameters struct {
certFile string // path to the x509 cert
issuerUrl string // webhook server port
crlUrl string // webhook server port
}
var parameters ServerParameters
func main() {
flag.StringVar(¶meters.certFile, "certFile", "", "File containing the x509 certificate")
flag.StringVar(¶meters.issuerUrl, "issuerUrl", "http://127.0.0.1:8888/api/v1/cfssl/info", "Url for retrieving the issuer certificate")
flag.StringVar(¶meters.crlUrl, "crlUrl", "http://127.0.0.1:8888/api/v1/cfssl/crl", "Url for retrieving the crl")
flag.Parse()
if parameters.certFile == "" {
flag.Usage()
os.Exit(1)
}
// read x509 certificate from PEM encoded file
cert_bytes := readFile(parameters.certFile)
cert, err := decodeCert(cert_bytes)
if err != nil {
log.Fatal(err)
}
issuer_bytes, err := getIssuer(parameters.issuerUrl) //readCert(os.Args[2])
if err != nil {
log.Fatal(err)
}
issuer, err := decodeCert(issuer_bytes)
if err != nil {
log.Fatal(err)
}
// Perform OCSP Check
fmt.Println("Checing OCSP")
status, err := checkOCSPStatus(cert, issuer)
if err != nil {
fmt.Println(err)
} else {
switch status {
case ocsp.Good:
fmt.Printf("[+] Certificate status is Good\n")
case ocsp.Revoked:
fmt.Printf("[-] Certificate status is Revoked\n")
case ocsp.Unknown:
fmt.Printf("[-] Certificate status is Unknown\n")
}
crl_bytes, err := getCrl(parameters.crlUrl) //readCert(os.Args[2])
if err != nil {
log.Fatal(err)
}
crl, err := decodeCrl(crl_bytes)
if err != nil {
log.Fatal(err)
}
fmt.Println("\nChecing CRL")
_, err = checkCRLStatus(cert, issuer, crl)
if err != nil {
log.Fatal(err)
}else{
fmt.Println("[+] Certificate status is not Revoked\n")
}
}
func checkCRLStatus(cert *x509.Certificate, issuer *x509.Certificate, crl *pkix.CertificateList) (bool, error) {
var revoked = false
// Check CRL signature
err := issuer.CheckCRLSignature(crl)
if err != nil {
return revoked, err
}
// Check CRL validity
if crl.TBSCertList.NextUpdate.Before(time.Now()) {
return revoked, fmt.Errorf("CRL is outdated")
}
// Searching for our certificate in CRL
for _, revokedCertificate := range crl.TBSCertList.RevokedCertificates {
if revokedCertificate.SerialNumber.Cmp(cert.SerialNumber) == 0 {
//Found validated certificate in list of revoked ones
revoked = true
return revoked, fmt.Errorf("[-] Certificate status is Revoked\n")
}
}
return revoked, nil
}
// CheckOCSPStatus will make an OCSP request for the provided certificate.
// If the status of the certificate is not good, then an error is returned.
func checkOCSPStatus(cert *x509.Certificate, issuer *x509.Certificate) (int, error) {
var (
ctx = context.Background()
ocspURL = cert.OCSPServer[0]
)
// Build OCSP request
buffer, err := ocsp.CreateRequest(cert, issuer, &ocsp.RequestOptions{
Hash: crypto.SHA256,
})
if err != nil {
return ocsp.Unknown, fmt.Errorf("creating ocsp request body: %w", err)
}
req, err := http.NewRequest(http.MethodPost, ocspURL, bytes.NewBuffer(buffer))
if err != nil {
return ocsp.Unknown, fmt.Errorf("creating http request: %w", err)
}
ocspUrl, err := url.Parse(ocspURL)
if err != nil {
return ocsp.Unknown, fmt.Errorf("parsing ocsp url: %w", err)
}
req.Header.Add("Content-Type", "application/ocsp-request")
req.Header.Add("Accept", "application/ocsp-response")
req.Header.Add("host", ocspUrl.Host)
req = req.WithContext(ctx)
// Make OCSP request
httpResponse, err := http.DefaultClient.Do(req)
if err != nil {
return ocsp.Unknown, fmt.Errorf("making ocsp request: %w", err)
}
defer httpResponse.Body.Close()
output, err := ioutil.ReadAll(httpResponse.Body)
if err != nil {
return ocsp.Unknown, fmt.Errorf("reading response body: %w", err)
}
// Parse response
ocspResponse, err := ocsp.ParseResponse(output, issuer)
if err != nil {
return ocsp.Unknown, fmt.Errorf("parsing ocsp response: %w", err)
}
return ocspResponse.Status, nil
}
func readFile(file string) []byte {
cert, err := ioutil.ReadFile(file)
if err != nil {
log.Fatalln(err)
}
return cert
}
func decodeCert(cert_bytes []byte) (*x509.Certificate, error) {
b, _ := pem.Decode(cert_bytes)
var cert *x509.Certificate
cert, err := x509.ParseCertificate(b.Bytes)
if err != nil {
fmt.Println("Parse Error")
return nil, fmt.Errorf("parsing certificate: %w", err)
}
return cert, err
}
func decodeCrl(cert_bytes []byte) (*pkix.CertificateList, error) {
b, _ := pem.Decode(cert_bytes)
crl, err := x509.ParseCRL(b.Bytes)
if err != nil {
return nil, fmt.Errorf("parsing crl: %w", err)
}
return crl, err
}
func getIssuer(issuerUrl string) ([]byte, error) {
var resp = &http.Response{}
values := map[string]string{"label": "intermediate"}
jsonValue, _ := json.Marshal(values)
resp, err := http.Post(issuerUrl, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
fmt.Println(err)
panic("Something wrong with the post request")
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
json.Unmarshal([]byte(body), &cfinfo)
return []byte(cfinfo.Result.Certificate), nil
}
func getCrl(crlUrl string) ([]byte, error) {
resp, err := http.Get(crlUrl)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
return nil, fmt.Errorf("failed to retrieve CRL")
}
body, err := ioutil.ReadAll(resp.Body)
json.Unmarshal([]byte(body), &crlinfo)
crlString := "-----BEGIN X509 CRL-----\n" + crlinfo.Result + "\n-----END X509 CRL-----"
return []byte(crlString), err
}
|
Kafka
Strimzi
You can install strimzi kafka using the quick start guide: Strimzi Quickstarts
You'll need to overwrite the kafka cluster with your own definition: my-cluster.yaml
This file includes an authorization section:
Code Block | ||||
---|---|---|---|---|
| ||||
authorization:
type: opa
url: http://opa.default:8181/v1/data/policy/kafka/authz/allow
allowOnError: false
initialCacheCapacity: 1000
maximumCacheSize: 10000
expireAfterMs: 10 #60000
superUsers:
- CN=henri
- anwar
- CN=wesley
- CN=my-user |
This file includes an listeners section:
Code Block | ||||
---|---|---|---|---|
| ||||
listeners:
- name: plain
port: 9092
tls: false
type: internal
- name: tls
port: 9093
tls: true
type: internal
- name: plain3
port: 9097
tls: false
type: internal
authentication:
type: oauth
checkIssuer: false
checkAccessTokenType: true
accessTokenIsJwt: true
enableOauthBearer: true
introspectionEndpointUri: http://keycloak.default:8080/auth/realms/opa/protocol/openid-connect/token/introspect
clientId: opacli
clientSecret:
secretName: my-cluster-oauth
key: clientSecret
validIssuerUri: http://keycloak.default:8080/auth/realms/opa
clientAudience: account
customClaimCheck: "@.resource_access['opacli'].roles[0] == 'opa-client-role'" |
Where opa and keycloak are configured.
If you are publishing or subscribing using this listener you will need to provide a JWT with your request.
The token will be verified using keycloak along with the audience field and the resource access['opaclii'].roles field.
Kafka will then pass the authorizaton on to opa where other checks will take place.
Below is a sample opa policy you might use to check the kafka input:
Code Block | ||||
---|---|---|---|---|
| ||||
package policy.kafka.authz
default allow = false
allow = true {
allowedActions := ["DESCRIBE", "DESCRIBE_CONFIGS", "READ"]
input.action.operation == allowedActions[_]
input.requestContext.header.name.clientId == "kowl"
}
allow = true {
allowedActions := ["DESCRIBE", "DESCRIBE_CONFIGS"]
allowedTopics := ["__consumer_offsets","__strimzi_store_topic", "__strimzi-topic-operator-kstreams-topic-store-changelog",]
input.action.resourcePattern.name == allowedTopics[_]
}
allow = true {
allowedActions := ["DESCRIBE", "READ"]
input.action.operation == allowedActions[_]
allowedResourceTypes := ["GROUP"]
input.action.resourcePattern.resourceType == allowedResourceTypes[_]
input.requestContext.principal.name == "service-account-opacli"
}
allow = true {
input.action.operation == "CREATE"
input.action.resourcePattern.name == "kafka-cluster"
input.action.resourcePattern.resourceType == "CLUSTER"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "consumer-client"
}
allow = true {
allowedActions := ["DESCRIBE","READ"]
input.action.operation == allowedActions[_]
input.action.resourcePattern.name == "my-topic"
input.action.resourcePattern.resourceType == "TOPIC"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "consumer-client"
}
allow = true {
allowedActions := ["DESCRIBE","WRITE"]
input.action.operation == allowedActions[_]
input.action.resourcePattern.name == "my-topic"
input.action.resourcePattern.resourceType == "TOPIC"
input.requestContext.principal.name == "service-account-opacli"
input.requestContext.header.name.clientId == "producer-client"
} |
The default value of the sub field (principal name) in keycloak is the user id, this can be changed to the user name using a property mapper:
kube-mgmt
By default the entire policy and data cache are defined by the opa bundle.
If you need to add data from other sources you need to define the bundle root directories in the .manifest file
e.g.
Code Block | ||||
---|---|---|---|---|
| ||||
{
"revision" : "1",
"roots": ["policy", "roles"]
} |
You can then add the kube-mgmt sidecar to your opa deployment and this will pull data from configmaps in the namespace speciiied
e.g.
Code Block | ||
---|---|---|
| ||
- name: kube-mgmt
image: openpolicyagent/kube-mgmt:latest
args:
- "--namespaces=opa" |
Code Block | ||||
---|---|---|---|---|
| ||||
kind: ConfigMap
apiVersion: v1
metadata:
name: hello-data
namespace: opa
labels:
openpolicyagent.org/data: opa
data:
x.json: |
{"a": [1,2,3,4]} |
Code Block | ||||
---|---|---|---|---|
| ||||
{
"access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
} |
You can also add extra data using curl e.g. curl -X PUT $host:31182/v1/data/jwt -d @token.json
You can view all the data available to opa using curl: curl <opa host>:<port>/v1/data
You should see something similar to the following output:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"decision_id":"067f7d2d-0e5d-4ca8-8cb0-6d38543c6b2f",
"result":{
"jwt":{ "access_token":"eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJDamJ4a2FONjRVcUdYNThWU2R3WjBxQTdWRmN1TGdEQWhnUWJTVG55UE9JIn0.eyJleHAiOjE2Njg1MjgwMjEsImlhdCI6MTY2ODUyNzcyMSwianRpIjoiMjc1NzkzNjktOWExMi00NjA4LTk5ZjMtNTI2MmE4MzI5NGViIiwiaXNzIjoiaHR0cDovL2tleWNsb2FrLmRlZmF1bHQ6ODA4MC9hdXRoL3JlYWxtcy9vcGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoic2VydmljZS1hY2NvdW50LW9wYWNsaSIsInR5cCI6IkJlYXJlciIsImF6cCI6Im9wYWNsaSIsImFjciI6IjEiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiIsImRlZmF1bHQtcm9sZXMtb3BhIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsib3BhY2xpIjp7InJvbGVzIjpbIm9wYS1jbGllbnQtcm9sZSJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJwcm9maWxlIGVtYWlsIiwiY2xpZW50SWQiOiJvcGFjbGkiLCJjbGllbnRIb3N0IjoiMTI3LjAuMC42IiwiZW1haWxfdmVyaWZpZWQiOmZhbHNlLCJjbGllbnRSb2xlIjoiW29wYS1jbGllbnQtcm9sZV0iLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJzZXJ2aWNlLWFjY291bnQtb3BhY2xpIiwiY2xpZW50QWRkcmVzcyI6IjEyNy4wLjAuNiJ9.B0X0zhAintHAXAtjiOngzTM0Wdv_aWM8qdyF4MXEZmE6AxoaMpaNQ6B18k1pkMz60qmTUUcYJOm-xHAdipnxptAf44_YmaybVE9_otjO59RBLonUlFtWDnNV4EqixLDaXN33Z54S1iWCPjBvI58sH_jQyMiXM7ur1FfjCCEeQwP2D9CgWY_3-Vg4Wy4cMAbsxOhZZog-QmKV8nkeyELqUS24xk_vpPkYOdG1ztc5SxHnLLsT4iwXgzYjAveiXyKyyYOC0oJP5zgCYEkQCvRORmuU0whN_5g4oTSIFcqN88KsrYb-R0I2RS7mQI8LXHrNIqMlNN2j5PoHZmgvMrpO3w"
},
"opa":{
"hello-data":{
"x.json":{
"a":[
1,
2,
3,
4
]
}
}
},
"servers":{
"id":"s1",
"name":"app",
"ports":[
"p1",
"p2",
"p3"
],
"protocols":[
"https",
"ssh"
]
}
}
} |
You can get individual values like this: curl <opa host>:<port>/v1/data/jwt/access_token or refer to them in your code like this: data.opa["hello-data"]["x.json"].a[0]
Strimzi Bridge
You can communicate with kafka over http by adding strimzi bridge:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
name: strimzi-bridge
namespace: kafka
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
http:
port: 8080 |
See also:
Using Open Policy Agent with Strimzi and Apache Kafka
Secure Kafka with Keycloak: SASL OAuth Bearer
Enforce Custom Resource policies with Open Policy Agent Gatekeeper
How to integrate Kafka with Istio
Example with JWT payload passed from Envoy
Kafka authentication using OAuth 2.0
Using HTTP Bridge as a Kubernetes sidecar
Kafka SSL : Setup with self signed certificate
Kafka & Istio
Integration of Istio with Kafka is limited.
The Kafka Protocol is a TCP level protocol (Layer 4)
Kafka is a binary wrapper over TCP protocol.
Istio mainly supports HTTP/HTTPS on layer 7.
Higher level protocols such as HTTP have access to metadata so they can properly route requests, this is not the case with the Kafka protocol.
Service Mesh and Proxies: Examples for Kafka
Setting up Strimzi bridge with Istio Authorization and ACLs
The Kiali screenshot above represents both a producer and a consumer connecting to Kafka through the Strmzi bridge.
Access to the bridge is controlled using JWT which is checked using an Istio AuthorizationPolicy.
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: "security.istio.io/v1beta1"
kind: "AuthorizationPolicy"
metadata:
name: "kafka-bridge-policy"
namespace: kafka
spec:
selector:
matchLabels:
app.kubernetes.io/name: kafka-bridge
action: ALLOW
rules:
- from:
- source:
requestPrincipals: ["http://keycloak.default:8080/auth/realms/opa", "http://istio-ingressgateway.istio-system:80/auth/realms/opa"]
- to:
- operation:
methods: ["POST", "GET"]
paths: ["/topics", "/topics/*", "/consumers/*"]
when:
- key: request.auth.claims[clientRole]
values: ["opa-client-role"] |
By enabling tls on one of our bootstrap listeners user certificates are automatically created when we create a new KafkaUser
- name: tlsauth
type: internal
port: 9096
tls: true
authentication:
type: tls
Setting the authentication type to tls also ensure secure communication between Kafka and zookeeper.
We also set authorization to simple for the Kafka cluster, this means ACLs will be used.
kafka:
authorization:
type: simple
superUsers:
- CN=kowl
Any users you want to by-pass ACL checks should be listed under superUsers.
We can then setup our bridge user and include the ACL permissions as part of the KafkaUser definition:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: bridge
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# access to the topic
- resource:
type: topic
name: my-topic
operations:
- Create
- Describe
- Read
- Write
- AlterConfigs
host: "*"
# access to the group
- resource:
type: group
name: my-group
operations:
- Describe
- Read
host: "*"
# access to the cluster
- resource:
type: cluster
operations:
- Alter
- AlterConfigs
host: "*" |
Authorization between the bridge and Kafka is done using user certificates.
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaBridge
metadata:
name: strimzi-bridge
namespace: kafka
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9096
http:
port: 8080
logging:
type: inline
loggers:
logger.bridge.level: "DEBUG"
logger.send.name: "http.openapi.operation.send"
logger.send.level: "DEBUG"
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: bridge
certificate: user.crt
key: user.key |
Records can then to be posted to a topic with a call like this:
Endpoint:
http://istio-ingressgateway.istio-system:80/topics/my-topic
Payload: {"records":[{"key":"key-366","value":"value-366"}]}
and read from a topic using the following syntax:
i) Create a consumer group
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group
Payload: {"name":"my-consumer","format":"json","auto.offset.reset":"earliest","enable.auto.commit":false}
ii) subscribe to a topic
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/subscription
Payload: {"topics":["my-topic"]}
iii) Read records from a topic
Endpoint: http://istio-ingressgateway.istio-system:80/consumers/my-group/instances/my-consumer/records
These calls will be redirected to the bridge using a gateway and a virtual service
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
name: strimzi-bridge-gateway
namespace: kafka
spec:
selector:
istio: ingressgateway # use Istio gateway implementation
servers:
- port:
number: 80
name: http
protocol: HTTP
hosts:
- "*"
---
apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: strimzi-bridge-vs
namespace: kafka
spec:
hosts:
- "*"
gateways:
- strimzi-bridge-gateway
http:
- name: "strimzi-bridge-routes"
match:
- uri:
prefix: "/topics"
- uri:
prefix: "/consumers"
route:
- destination:
port:
number: 8080
host: strimzi-bridge-bridge-service.kafka.svc.cluster.local |
Note: To use Kowl with this setup you'll need to create a kowl user and configure kowl to use tls
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: kowl
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls |
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: v1
kind: ConfigMap
metadata:
name: kowl-config-cm
namespace: kafka
data:
config.yaml: |
kafka:
brokers:
- my-cluster-kafka-0.my-cluster-kafka-brokers.kafka.svc:9093
tls:
enabled: true
caFilepath: /etc/strimzi/ca/ca.crt
certFilepath: /etc/strimzi/user-crt/crt/user.crt
keyFilepath: /etc/strimzi/user-key/key/user.key |
- ca.crt is obtained from the my-cluster-cluster-ca-cert secret, user.crt and user.key are obtained from the kowl secret.
Admission Controllers
An admission controller is a piece of code that intercepts requests to the Kubernetes API server prior to persistence of the object, but after the request is authenticated and authorized.
Admission Controllers Reference
Kyverno
Opa Gatekeeper
OPA Gatekeeper: Policy and Governance for Kubernetes