Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Introduction

ksqlDB is a database purpose-built to help developers create stream processing applications on top of Apache Kafka.


Setup

Kafka keystore/truststore

Create a strimzi user (ksqldb)

...

This will create a secret called "ksqldb-jks" which contains the keystore and truststore needed to connect to Kafka over SSL.


ksqldb-server

Code Block
languageyml
titleksqldb-server
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ksqldb-server
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: ksqldb-server
  template:
    metadata:
      labels:
        app: ksqldb-server
        version: v1
    spec:
      containers:
      - name: ksqldb-server
        image: confluentinc/ksqldb-server:0.28.2
        imagePullPolicy: IfNotPresent
        env:
        - name: KEYSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: ksqldb-jks
              key: keystore_password
        - name: TRUSTSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: ksqldb-jks
              key: truststore_password
        - name: KEY_PASSWORD
          valueFrom:
            secretKeyRef:
              name: ksqldb-jks
              key: key_password
        - name: KSQL_BOOTSTRAP_SERVERS
          value: my-cluster-kafka-bootstrap.kafka:9093
        - name: KSQL_LISTENERS
          value: http://0.0.0.0:8088
        - name: KSQL_KSQL_SERVICE_ID
          value: ksql_service_2_
        - name: KSQL_SECURITY_PROTOCOL
          value: SSL
        - name: KSQL_OPTS
          value: "-Dssl.keystore.location=/var/private/ssl/user-keystore.jks -Dssl.keystore.password=$(KEYSTORE_PASSWORD) -Dssl.key.password=$(KEY_PASSWORD) -Dssl.truststore.location=/var/private/ssl/user-truststore.jks -Dssl.truststore.password=$(TRUSTSTORE_PASSWORD) -Dlisteners=http://0.0.0.0:8088/"
        - name: KSQL_KSQL_EXTENSION_DIR
          value: /opt/ksqldb-udfs
        volumeMounts:
        - name: jks
          mountPath: /var/private/ssl
          readOnly: true
      volumes:
      - name: jks
        secret:
          secretName: ksqldb-jks
---
apiVersion: v1
kind: Service
metadata:
  name: ksqldb-server
  namespace: kafka
  labels:
    app: ksqldb-server
    service: ksqldb-server
spec:
  type: LoadBalancer
  selector:
    app: ksqldb-server
  ports:
  - port: 8088
    name: http-80

...

It has been configured to connect to your kafka cluster over SSL using the "ksqldb-jks" secret created in the previous step.


ksqldb-cli

Code Block
languageyml
titleksqldb-cli
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ksqldb-cli
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: ksqldb-cli
  template:
    metadata:
      labels:
        app: ksqldb-cli
        version: v1
    spec:
      containers:
      - name: ksqldb-cli
        image: confluentinc/ksqldb-cli:0.28.2
        imagePullPolicy: IfNotPresent
        env:
        - name: KEYSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: ksqldb-jks
              key: keystore_password
        - name: TRUSTSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: ksqldb-jks
              key: truststore_password
          valueFrom:
            secretKeyRef:
              name: ksqldb-jks
              key: key_password
        - name: KSQL_BOOTSTRAP_SERVERS
          value: my-cluster-kafka-bootstrap.kafka:9093
        - name: KSQL_SECURITY_PROTOCOL
          value: SSL
        - name: KSQL_OPTS
          value: "-Dssl.keystore.location=/var/private/ssl/user-keystore.jks -Dssl.keystore.password=$(KEYSTORE_PASSWORD) -Dssl.key.password=$(KEY_PASSWORD) -Dssl.truststore.location=/var/private/ssl/user-truststore.jks -Dssl.truststore.password=$(TRUSTSTORE_PASSWORD) -Dlisteners=http://0.0.0.0:8088/"
        volumeMounts:
        - name: jks
          mountPath: /var/private/ssl
          readOnly: true
      volumes:
      - name: jks
        secret:
          secretName: ksqldb-jks

...

The client can be used to create streams, tables and run queries.

Steams

Code Block
languagetext
titlepms stream
        CREATE STREAM IF NOT EXISTS PMS_STREAM (
                      event STRUCT<
                        commonEventHeader STRUCT<domain VARCHAR,
                                                 eventName VARCHAR,
                                                 sourceName VARCHAR,
                                                 reportingEntityName VARCHAR,
                                                 startEpochMicrosec INT,
                                                 lastEpochMicrosec INT,
                                                 timeZoneOffset VARCHAR>,
                        perf3gppFields STRUCT<perf3gppFieldsVersion VARCHAR,
                                              measDataCollection STRUCT<granularityPeriod INT,
                                                                        measuredEntityUserName   VARCHAR,
                                                                        measuredEntityDn VARCHAR,
                                                                        measuredEntitySoftwareVersion VARCHAR,
                                                                        measInfoList ARRAY<STRUCT<
                                                                         measInfoId STRUCT<sMeasInfoId VARCHAR>,
                                                                         measTypes STRUCT<
                                                                           sMeasTypesList ARRAY<VARCHAR>>,
                                                                         measValuesList ARRAY<STRUCT<
                                                                            measObjInstId VARCHAR,
                                                                            suspectFlag VARCHAR,
                                                                            measResults ARRAY<
                                                                                        STRUCT<p INT, "+
                                                                                               sValue VARCHAR>>>>>>>>>) 
                                             WITH (KAFKA_TOPIC='pms', 
                                                   VALUE_FORMAT='JSON', 
                                                    PARTITIONS = 1);

...

Code Block
languagetext
titlepms transform5
CREATE STREAM IF NOT EXISTS pms_stream_transform5 AS 
select MY_COMPOSITE_KEY, measuredEntityDn, measuredEntityUserName, sMeasType, measObjInstId, sValue 
from pms_stream_transform4 partition by MY_COMPOSITE_KEY;


Tables

Code Block
languagetext
titlepms table
CREATE TABLE IF NOT EXISTS PMS_TABLE (ROWKEY VARCHAR PRIMARY KEY, 
                measuredEntityDn VARCHAR, 
                measuredEntityUserName VARCHAR, 
                sMeasType VARCHAR, 
                measObjInstId VARCHAR, 
                sValue VARCHAR 
                )             WITH (KAFKA_TOPIC='PMS_STREAM_TRANSFORM5', VALUE_FORMAT='JSON', PARTITIONS = 1);

...

Code Block
languagetext
titlepms view
CREATE OR REPLACE table pms_view as select * from PMS_TABLE EMIT CHANGES;


Queries


Links

ksqldb

How to create a user-defined function

...