Introduction
Kafka Connect allows you to continuously ingest data from external systems into Kafka via connect sources and write data from Kafka to external system via connect sinks.
Various plugins are available for a variety of different data sources and data sinks.
Single Message Transformations (SMTs) are applied to messages as they flow through Connect.
Example
Postgres JDBC Sink Connector
Prepare the connector image
Download the JDBC sink connector from JDBC Connector (Source and Sink)
Create a docker file for this connector
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 USER root:root RUN mkdir -p /opt/kafka/plugins/jdbc COPY ./confluentinc-kafka-connect-jdbc-10.6.0.zip /opt/kafka/plugins/j dbc/ RUN unzip /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip -d /opt/kafka/plugins/jdbc RUN rm /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip USER 1001
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi connector
Prepare the postges database
Setup a new username/password and schema for Kafka connect to write to
SELECT 'CREATE DATABASE kafka' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = 'kafka')\gexec DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_user WHERE usename = 'kafka') THEN CREATE USER kafka WITH PASSWORD 'kafka'; GRANT ALL PRIVILEGES ON DATABASE kafka TO kafka; END IF; END $$;
Prepare the message producer
In this example we are going to deserialize the kafka key and value to json, for this to work we need to include the json schema with the message.
They will take the following format:
{ "schema": { "type": "struct", "optional": false, "version": 1, "fields": [ { "field": "id", "type": "string", "optional": true } ] }, "payload": { "id": 1 } }
The above structure can be represented in golang using the following structs:
type SchemaPayload[T any] struct { Schema Schema `json:"schema"` Payload T `json:"payload"` } type Schema struct { Type string `json:"type"` Optional bool `json:"optional"` Version int `json:"version"` Fields []Fields `json:"fields"` } type Fields struct { Field string `json:"field"` Type string `json:"type"` Optional bool `json:"optional"` } type KeyPayload struct { Id string `json:"id"` } type ValuePayload struct { Text string `json:"text"` }
We can use the same SchemaPayload struct for both keys and values, we only need to provide the Payload struct type we initialize it with.
Create the KafkaConnect object
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: connect namespace: kafka labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster namespace: kafka annotations: # use-connector-resources configures this KafkaConnect # to use KafkaConnector resources to avoid # needing to call the Connect REST API directly strimzi.io/use-connector-resources: "true" spec: image: version: 3.3.1 replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt - secretName: my-cluster-clients-ca-cert certificate: ca.crt authentication: type: tls certificateAndKey: secretName: connect certificate: user.crt key: user.key config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status # -1 means it will use the default replication factor configured in the broker config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1
We are connecting to the 9093 tls port so we include required certificates in our configuration.
The image tag references the image we built earlier : ktimoney/strimzi-connect
Create the Sink connector
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector namespace: kafka labels: strimzi.io/cluster: my-connect-cluster spec: class: io.confluent.connect.jdbc.JdbcSinkConnector tasksMax: 2 config: topics: my-topic connection.url: jdbc:postgresql://postgres.default:5432/kafka connection.user: kafka connection.password: kafka key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: true value.converter.schemas.enable: true pk.mode: record_key pk.fields: id auto.create: true delete.enabled: true
The KafkaConnector is configured to use the "io.confluent.connect.jdbc.JdbcSinkConnector" class.
The converters are set to "org.apache.kafka.connect.json.JsonConverter"
The pk.mode is set to "record_key" and pk.fields is set to "id", this means it will use the id field from the key as the primary key.
The database connection details are also included.
Links
Building your own Kafka Connect image with a custom resource
Kafka Connect Deep Dive – Converters and Serialization Explained
How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling