...
Code Block | ||||
---|---|---|---|---|
| ||||
package policies.rappopaprovider.policy import input.attributes.request.http as http_request import future.keywords.in default certs_url = "" default realm_url = "" jwks := jwks_request(certs_url).body filtered_jwks := [ key | some key in jwks.keys key.use == "sig" ] token_cert := json.marshal({"keys": filtered_jwks}) token = { "isValid": isValid, "header": header, "payload": payload } { [_, encoded] := split(http_request.headers.authorization, " ") [isValid, header, payload] := io.jwt.decode_verify(encoded, { "cert": token_cert, "aud": "account", "iss": realm_url}) } deny[realm] { not is_token_valid realm_url = sprintf("http://keycloak:8080/auth/realms/%s", realm) certs_url = sprintf("%s/protocol/openid-connect/certs", realm_url) not is_token_valid } is_token_valid { token.isValid now := time.now_ns() / 1000000000 token.payload.iat <= now now < token.payload.exp token.payload.clientRole = "[opa-client-role]" } jwks_request(url) = http.send({ "url": url, "method": "GET", "force_cache": true, "force_json_decode": true, "force_cache_duration_seconds": 3600 # Cache response for an hour }) |
...