You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 14 Next »

Introduction

Kafka Connect allows you to continuously ingest data from external systems into Kafka via connect sources and write data from Kafka to external system via connect sinks.

Various plugins are available for a variety of different data sources and data sinks.

Confluent Plugins

Camel Plugins

Debezium Plugins

Single Message Transformations (SMTs) are applied to messages as they flow through Connect. 


Connect Demo

Postgres JDBC Sink Connector

Prepare the connector image

Download the JDBC sink connector from JDBC Connector (Source and Sink)

Create a docker file for this connector

Docker
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/jdbc
COPY ./confluentinc-kafka-connect-jdbc-10.6.0.zip /opt/kafka/plugins/j
dbc/
RUN unzip /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip -d /opt/kafka/plugins/jdbc
RUN rm /opt/kafka/plugins/jdbc/confluentinc-kafka-connect-jdbc-10.6.0.zip
USER 1001

Build the image and copy it to your docker repository

docker build -f Dockerfile . t ktimoney/strimzi connector


Prepare the postges database

Setup a new username/password and schema for Kafka connect to write to

Postgres
    SELECT 'CREATE DATABASE kafka'
    WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = 'kafka')\gexec
    DO $$
    BEGIN
      IF NOT EXISTS (SELECT FROM pg_user WHERE  usename = 'kafka') THEN
         CREATE USER kafka WITH PASSWORD 'kafka';
         GRANT ALL PRIVILEGES ON DATABASE kafka TO kafka;
      END IF;
    END
    $$;


Prepare the message producer

In this example we are going to deserialize the kafka key and value to json, for this to work we need to include the json schema with the message.

They will take the following format:

Schema & Payload
{
  "schema": {
    "type": "struct",
    "optional": false,
    "version": 1,
    "fields": [
      {
        "field": "id",
        "type": "string",
        "optional": true
      }
    ]
  },
  "payload": {
    "id": 1
  }
}


The above structure can be represented in golang using the following structs:

Go Structs
type SchemaPayload[T any] struct {
        Schema Schema `json:"schema"`
        Payload T `json:"payload"`
}

type Schema struct {
        Type string `json:"type"`
        Optional bool  `json:"optional"`
        Version int `json:"version"`
        Fields []Fields `json:"fields"`
}

type Fields struct {
        Field string  `json:"field"`
        Type string `json:"type"`
        Optional bool  `json:"optional"`
}

type KeyPayload struct {
      Id       string      `json:"id"`
}

type ValuePayload struct {
      Text       string      `json:"text"`
}


We can use the same SchemaPayload struct for both keys and values, we only need to provide the Payload struct type we initialize it with.


Create the KafkaConnect object

KafkaConnect
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: connect
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: kafka
  annotations:
  # use-connector-resources configures this KafkaConnect
  # to use KafkaConnector resources to avoid
  # needing to call the Connect REST API directly
    strimzi.io/use-connector-resources: "true"
spec:
  image: 
  version: 3.3.1
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
      - secretName: my-cluster-clients-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      secretName: connect
      certificate: user.crt
      key: user.key
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1


We are connecting to the 9093 tls port so we include required certificates in our configuration.

The image tag references the image we built earlier : ktimoney/strimzi-connect


Create the Sink connector

KafkaConnector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.confluent.connect.jdbc.JdbcSinkConnector
  tasksMax: 2
  config:
    topics: my-topic
    connection.url: jdbc:postgresql://postgres.default:5432/kafka
    connection.user: kafka
    connection.password: kafka
    key.converter: org.apache.kafka.connect.json.JsonConverter 
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: true
    value.converter.schemas.enable: true
    pk.mode: record_key
    pk.fields: id
    auto.create: true
    delete.enabled: true


The KafkaConnector is configured to use the "io.confluent.connect.jdbc.JdbcSinkConnector" class.

The converters are set to "org.apache.kafka.connect.json.JsonConverter"

The pk.mode is set to "record_key" and pk.fields is set to "id", this means it will use the id field from the key as the primary key.

The database connection details are also included.


Links

Kafka Connect And Schemas

Building your own Kafka Connect image with a custom resource

Kafka Connectors with Strimzi

Kafka Connect Deep Dive – Converters and Serialization Explained

Connector Developer Guide

How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling







  • No labels