...
ksqlDB is a database purpose-built to help developers create stream processing applications on top of Apache Kafka.
Setup
The following setup is deigned to work with Strimzi.
Kafka keystore/truststore
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: ksqldb namespace: kafka labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls |
Generates keystireGenerate keystore/truststore secret
Code Block | ||||
---|---|---|---|---|
| ||||
#!/bin/sh if [ -z "$1" ] then echo "No argument supplied" exit 1 fi kafkauser=$1 WORKDIR=$(dirname "$(realpath "$0")") rm $WORKDIR/ca.crt $WORKDIR/user.crt $WORKDIR/user.key $WORKDIR/user-keystore.jks $WORKDIR/user.p12 $WORKDIR/user.password $WORKDIR/user-truststore.jks 2>/dev/null kubectl delete secret ${kafkauser}-jks -n kafka 2>/dev/null kubectl get secret my-cluster-cluster-ca-cert -n kafka -o jsonpath='{.data.ca\.crt}' | base64 --decode > $WORKDIR/ca.crt kubectl get secret ${kafkauser} -n kafka -o jsonpath='{.data.user\.key}' | base64 --decode > $WORKDIR/user.key kubectl get secret ${kafkauser} -n kafka -o jsonpath='{.data.user\.crt}' | base64 --decode > $WORKDIR/user.crt kubectl get secret ${kafkauser} -n kafka -o jsonpath='{.data.user\.p12}' | base64 --decode > $WORKDIR/user.p12 kubectl get secret ${kafkauser} -n kafka -o jsonpath='{.data.user\.password}' | base64 --decode > $WORKDIR/user.password export PASSWORD=`cat ${WORKDIR}/user.password` keytool -import -trustcacerts -file $WORKDIR/ca.crt -keystore $WORKDIR/user-truststore.jks -storepass $PASSWORD -noprompt keytool -importkeystore -srckeystore $WORKDIR/user.p12 -srcstorepass ${PASSWORD} -srcstoretype pkcs12 -destkeystore $WORKDIR/user-keystore.jks -deststorepass ${PASSWORD} -deststoretype jks kubectl create secret generic ${kafkauser}-jks -n kafka --from-literal=keystore_password=$PASSWORD --from-file=user-keystore.jks=${WORKDIR}/user-keystore.jks --from-literal=truststore_password=$PASSWORD --from-file=user-truststore.jks=${WORKDIR}/user-truststore.jks --from-literal=key_password=$PASSWORD |
...
Code Block | ||||
---|---|---|---|---|
| ||||
CREATE STREAM IF NOT EXISTS PMS_STREAM ( event STRUCT< commonEventHeader STRUCT<domain VARCHAR, eventName VARCHAR, sourceName VARCHAR, reportingEntityName VARCHAR, startEpochMicrosec INTBIGINT, lastEpochMicrosec INTBIGINT, timeZoneOffset VARCHAR>, perf3gppFields STRUCT<perf3gppFieldsVersion VARCHAR, measDataCollection STRUCT<granularityPeriod INT, measuredEntityUserName VARCHAR, measuredEntityDn VARCHAR, measuredEntitySoftwareVersion VARCHAR, measInfoList ARRAY<STRUCT< measInfoId STRUCT<sMeasInfoId VARCHAR>, measTypes STRUCT< sMeasTypesList ARRAY<VARCHAR>>, measValuesList ARRAY<STRUCT< measObjInstId VARCHAR, suspectFlag VARCHAR, measResults ARRAY< STRUCT<p INT, "+ sValue VARCHAR>>>>>>>>>) WITH (KAFKA_TOPIC='pms', VALUE_FORMAT='JSON', PARTITIONS = 1); |
...