...
Code Block | ||||
---|---|---|---|---|
| ||||
{'pretty': 'true', 'db': 'ts_host_metrics', 'q': 'SELECT "region", "host", "free", "used" FROM "disk_space" WHERE "host"=\'localhost\''} { "results": [ { "statement_id": 0, "series": [ { "name": "disk_space", "columns": [ "time", "region", "host", "free", "used" ], "values": [ [ "2023-01-23T13:03:32.1246436Z", "IE-region", "localhost", "81%", "19%" ] ] } ] } ] } |
Minio Sink Connector (Apache Camel)
Prepare the connector image
Download the minio sink connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-minio-sink-kafka-connector/3.19.0/camel-minio-sink-kafka-connector-3.19.0-package.tar.gz
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-minio-sink-kafka-connector-3.19.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.19.0-package.tar.gz --directory /opt/kafka/plugins/camel
RUN rm /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.19.0-package.tar.gz
USER 1001 |
Build the image and copy it to your docker repository
docker build -f Dockerfile . t ktimoney/strimzi_minio_connector
Prepare the message producer
In this example we are going to deserialize the value to json, we can do this without using a schema. (Same producer as the influxdb example)
They will take the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
} |
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: connect
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: ktimoney/strimzi-minio-connect
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
- secretName: my-cluster-clients-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: connect
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
offset.storage.topic: my-connect-cluster-offsets
config.storage.topic: my-connect-cluster-configs
status.storage.topic: my-connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1 |
We are connecting to the 9093 tls port so we include required certificates in our configuration.
The image tag references the image we built earlier : ktimoney/strimzi-minio-connect
Create the Sink connector
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: my-sink-connector
namespace: kafka
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector
tasksMax: 1
config:
topics: my-topic
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
key.ignore: true
camel.kamelet.minio-sink.bucketName: camel
camel.kamelet.minio-sink.accessKey: ybK7RleFUkdDeYBf
camel.kamelet.minio-sink.secretKey: X0Y4zK84bwdefRTljrnPzOb1l0A6OQj2
camel.kamelet.minio-sink.endpoint: http://minio.default:9000
camel.kamelet.minio-sink.autoCreateBucket: true |
The KafkaConnector is configured to use the "org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector class.
The value converter are set to "org.apache.kafka.connect.storage.StringConverter".
We will need to create our own TypeConverter if we want to deserialize to json.
The minio connection details are specified in camel.kamelet.minio-sink parameters.
The yaml for the minio pod is available here: minio.yaml
When the connector is running it copies the kafka records into minio as files:
Links
...