...
Single Message Transformations (SMTs) are applied to messages as they flow through Connect.
Connect Demo
Postgres JDBC Sink Connector (Confluent)
Prepare the connector image
...
Code Block | ||||
---|---|---|---|---|
| ||||
SELECT 'CREATE DATABASE kafka' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = 'kafka')\gexec DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_user WHERE usename = 'kafka') THEN CREATE USER kafka WITH PASSWORD 'kafka'; GRANT ALL PRIVILEGES ON DATABASE kafka TO kafka; END IF; END $$; |
Prepare the message producer
In this example we are going to deserialize the kafka key and value to json, for this to work we need to include the json schema with the message.
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: connect namespace: kafka labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster namespace: kafka annotations: # use-connector-resources configures this KafkaConnect # to use KafkaConnector resources to avoid # needing to call the Connect REST API directly strimzi.io/use-connector-resources: "true" spec: image: version: 3.3.1ktimoney/strimzi-connect replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt - secretName: my-cluster-clients-ca-cert certificate: ca.crt authentication: type: tls certificateAndKey: secretName: connect certificate: user.crt key: user.key config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status # -1 means it will use the default replication factor configured in the broker config.storage.replication.factor: -1 offset.storage.replication.factor: -1 status.storage.replication.factor: -1 |
...
The database connection details are also included.
Influxdb Sink Connector (Apache Camel)
Prepare the connector image
Download the Influxdb connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-influxdb-kafka-connector/0.8.0/camel-influxdb-kafka-connector-0.8.0-package.tar.gz
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.22.1-kafka-2.7.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-influxdb-kafka-connector-0.8.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz --directory /opt/kafka/plugins/camel
RUN rm /opt/kafka/plugins/camel/camel-influxdb-kafka-connector-0.8.0-package.tar.gz
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_influxdb_connector
Prepare the message producer
In this example we are going to deserialize the value to json, we can do this without using a schema.
They will take the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
} |
The important thing to note here is that one of the "keys" has to be "camelInfluxDB.MeasurementName", this is required by the camel transformer when converting into an influxdb point object.
The above structure can be represented in golang using the following struct:
Code Block | ||||
---|---|---|---|---|
| ||||
type InfluxPayload struct {
Measurement string `json:"camelInfluxDB.MeasurementName"`
Time string `json:"time"`
Host string `json:"host"`
Region string `json:"region"`
Used string `json:"used"`
Free string `json:"free"`
} |
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: connect
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: ktimoney/strimzi-influxdb-connect
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
- secretName: my-cluster-clients-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: connect
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
offset.storage.topic: my-connect-cluster-offsets
config.storage.topic: my-connect-cluster-configs
status.storage.topic: my-connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1 |
We are connecting to the 9093 tls port so we include required certificates in our configuration.
The image tag references the image we built earlier : ktimoney/strimzi-influx-connect
Create the Sink connector
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector namespace: kafka labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.influxdb.CamelInfluxdbSinkConnector tasksMax: 1 config: topics: my-topic errors.deadletterqueue.topic.name: my-topic-dl errors.deadletterqueue.topic.replication.factor: 1 key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false key.ignore: true auto.create: true camel.beans.influx: "#class:org.influxdb.InfluxDBFactory#connect('http://influxdb.default:8086', 'influxdb', 'influxdb')" camel.sink.path.connectionBean: influx camel.sink.endpoint.databaseName: ts_host_metrics camel.sink.endpoint.operation: insert camel.sink.endpoint.retentionPolicy: autogen |
The KafkaConnector is configured to use the "org.apache.camel.kafkaconnector.influxdb.CamelInfluxdbSinkConnector" class.
The value converter are set to "org.apache.kafka.connect.json.JsonConverter".
The camel connector comes with a in built TypeConverter called CamelInfluxDbConverters which will read the json produced as a Map<String, Object> map) and return an Influxdb Point object.
The database connection details are specified in camel.beans.influx.
This sets up a connection bean to be used by camel.sink.path.connectionBean.
Note: It is important to wrap the camel.beans.influx parameter in quotes otherwise it will be treated as a comment.
When the connector is running it will produce records to influxdb like the following:
Code Block | ||||
---|---|---|---|---|
| ||||
{'pretty': 'true', 'db': 'ts_host_metrics', 'q': 'SELECT "region", "host", "free", "used" FROM "disk_space" WHERE "host"=\'localhost\''}
{
"results": [
{
"statement_id": 0,
"series": [
{
"name": "disk_space",
"columns": [
"time",
"region",
"host",
"free",
"used"
],
"values": [
[
"2023-01-23T13:03:32.1246436Z",
"IE-region",
"localhost",
"81%",
"19%"
]
]
}
]
}
]
} |
Links
...