Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagetext
titleInfluxDB query
{'pretty': 'true', 'db': 'ts_host_metrics', 'q': 'SELECT "region", "host", "free", "used" FROM "disk_space" WHERE "host"=\'localhost\''}
{
    "results": [
        {
            "statement_id": 0,
            "series": [
                {
                    "name": "disk_space",
                    "columns": [
                        "time",
                        "region",
                        "host",
                        "free",
                        "used"
                    ],
                    "values": [
                        [
                            "2023-01-23T13:03:32.1246436Z",
                            "IE-region",
                            "localhost",
                            "81%",
                            "19%"
                        ]
                    ]
                }
            ]
        }
    ]
}


Minio Sink Connector (Apache Camel)

Prepare the connector image

Download the minio sink connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-minio-sink-kafka-connector/3.19.0/camel-minio-sink-kafka-connector-3.19.0-package.tar.gz

Create a docker file for this connector


Code Block
languagetext
titleDocker
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-minio-sink-kafka-connector-3.19.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.19.0-package.tar.gz --directory /opt/kafka/plugins/camel
RUN rm /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.19.0-package.tar.gz
USER 1001

Build the image and copy it to your docker repository

docker build -f Dockerfile . t ktimoney/strimzi_minio_connector



Prepare the message producer

In this example we are going to deserialize the value to json, we can do this without using a schema. (Same producer as the influxdb example)

They will take the following format:

Code Block
languagejs
titleSchema & Payload
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
}



Create the KafkaConnect object

Code Block
languagetext
titleKafkaConnect
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: connect
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: kafka
  annotations:
  # use-connector-resources configures this KafkaConnect
  # to use KafkaConnector resources to avoid
  # needing to call the Connect REST API directly
    strimzi.io/use-connector-resources: "true"
spec:   
  image: ktimoney/strimzi-minio-connect
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
      - secretName: my-cluster-clients-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      secretName: connect
      certificate: user.crt
      key: user.key
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    offset.storage.topic: my-connect-cluster-offsets
    config.storage.topic: my-connect-cluster-configs
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1


We are connecting to the 9093 tls port so we include required certificates in our configuration.

The image tag references the image we built earlier : ktimoney/strimzi-minio-connect


Create the Sink connector

Code Block
languagetext
titleKafkaConnector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:   
  class: org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector   
  tasksMax: 1   
  config:
    topics: my-topic
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    key.ignore: true
    camel.kamelet.minio-sink.bucketName: camel
    camel.kamelet.minio-sink.accessKey: ybK7RleFUkdDeYBf
    camel.kamelet.minio-sink.secretKey: X0Y4zK84bwdefRTljrnPzOb1l0A6OQj2
    camel.kamelet.minio-sink.endpoint: http://minio.default:9000
    camel.kamelet.minio-sink.autoCreateBucket: true


The KafkaConnector is configured to use the "org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector class.

The value converter are set to "org.apache.kafka.connect.storage.StringConverter".

We will need to create our own TypeConverter if we want to deserialize to json.

The minio connection details are specified in camel.kamelet.minio-sink parameters.

The yaml for the minio pod is available here: minio.yaml


When the connector is running it copies the kafka records into minio as files:

Image Added

Links

Kafka Connect And Schemas

...