...
Code Block | ||||
---|---|---|---|---|
| ||||
listeners: - name: plain port: 9092 tls: false type: internal - name: tls port: 9093 tls: true type: internal authentication: type: tls |
KafkaUser
Once Strimzi is up and running we need to create a user for our applications, we'll call them quarkus.
...
This will give the quarkus user access to the following topics: temperature-values, weather-stations, kafka-streams-quickstart-aggregator-weather-stations-store-changelog, temperatures-aggregated and the following group: kafka-streams-quickstart-aggregator.
JKS secret
Once the user is created we can create a secret to store our keystore, truststore and passwords.
...
We need to make some changes to the application.properties file to enable communication with Strimzi over SSL.
kafka-streams-quickstart-producer application.properties
Code Block | ||||
---|---|---|---|---|
| ||||
# Configure the Kafka broker location kafka.bootstrap.servers=my-cluster-kafka-bootstrap.kafka:9093 kafka.security.protocol=SSL kafka.ssl.keystore.location=/etc/ssl/user-keystore.jks kafka.ssl.keystore.password=${KEYSTORE_PASSWORD} kafka.ssl.key.password=${KEY_PASSWORD} kafka.ssl.keystore.type=JKS kafka.ssl.truststore.location=/etc/ssl/user-truststore.jks kafka.ssl.truststore.password=${TRUSTSTORE_PASSWORD} kafka.ssl.truststore.type=JKS mp.messaging.outgoing.temperature-values.connector=smallrye-kafka mp.messaging.outgoing.temperature-values.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer mp.messaging.outgoing.temperature-values.value.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.weather-stations.connector=smallrye-kafka mp.messaging.outgoing.weather-stations.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer mp.messaging.outgoing.weather-stations.value.serializer=org.apache.kafka.common.serialization.StringSerializer |
kafka-streams-quickstart-
...
aggregator application.properties
Code Block | ||||
---|---|---|---|---|
| ||||
kafka.bootstrap.servers=my-cluster-kafka-bootstrap.kafka:9093 kafka.security.protocol=SSL kafka.ssl.keystore.location=/etc/ssl/user-keystore.jks kafka.ssl.keystore.password=${KEYSTORE_PASSWORD} kafka.ssl.key.password=${KEY_PASSWORD} kafka.ssl.keystore.type=JKS kafka.ssl.truststore.location=/etc/ssl/user-truststore.jks kafka.ssl.truststore.password=${TRUSTSTORE_PASSWORD} kafka.ssl.truststore.type=JKS quarkus.kafka-streams.application-server=${hostname}:8080 quarkus.kafka-streams.topics=weather-stations,temperature-values # pass-through options kafka-streams.cache.max.bytes.buffering=10240 kafka-streams.commit.interval.ms=1000 kafka-streams.metadata.max.age.ms=500 |
...
Lastly we'll create our deployment files.
quarkus-producer
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: apps/v1 kind: Deployment metadata: name: quarkus-producer namespace: kafka spec: selector: matchLabels: app: quarkus-producer template: metadata: labels: app: quarkus-producer version: v1 spec: containers: - name: quarkus-producer image: ktimoney/quarkus-producer-native imagePullPolicy: IfNotPresent env: - name: KEYSTORE_PASSWORD valueFrom: secretKeyRef: name: quarkus-jks key: keystore_password - name: TRUSTSTORE_PASSWORD valueFrom: secretKeyRef: name: quarkus-jks key: truststore_password - name: KEY_PASSWORD valueFrom: secretKeyRef: name: quarkus-jks key: key_password volumeMounts: - name: jks mountPath: /etc/ssl/ readOnly: true volumes: - name: jks secret: secretName: quarkus-jks --- apiVersion: v1 kind: Service metadata: name: quarkus-producer namespace: kafka labels: app: quarkus-producer service: quarkus-producer spec: type: ClusterIP selector: app: quarkus-producer ports: - port: 8080 name: http-80 |
quarkus-aggregator
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: apps/v1 kind: Deployment metadata: name: quarkus-aggregator namespace: kafka spec: selector: matchLabels: app: quarkus-aggregator template: metadata: labels: app: quarkus-aggregator version: v1 spec: containers: - name: quarkus-aggregator image: ktimoney/quarkus-aggregator-native imagePullPolicy: IfNotPresent env: - name: KEYSTORE_PASSWORD valueFrom: secretKeyRef: name: quarkus-jks key: keystore_password - name: TRUSTSTORE_PASSWORD valueFrom: secretKeyRef: name: quarkus-jks key: truststore_password - name: KEY_PASSWORD valueFrom: secretKeyRef: name: quarkus-jks key: key_password volumeMounts: - name: jks mountPath: /etc/ssl/ readOnly: true volumes: - name: jks secret: secretName: quarkus-jks --- apiVersion: v1 kind: Service metadata: name: quarkus-aggregator namespace: kafka labels: app: quarkus-aggregator service: quarkus-aggregator spec: type: ClusterIP selector: app: quarkus-aggregator ports: - port: 8080 name: http-80 |
Once the pods are up and running you'll see the quarkus-producer pod writing records to the "temperature-values" and "weather stations" topics. quarkus-aggregator will write records to the "temperature-aggegatedaggregated".
The "kafka-streams-quickstart-aggregator-weather-stations-store-changelog" is an internal topic used the by the Kafka streams API.
Links
...