...
We can then create a custom jar for our code and copy it into the docker container.
Note: You can also set the filename in the Kafka header using either CamelHeader.file or CamelHeader.ce-file
Create a docker file for this connector
...
Mutating webhooks require https so we also need t to create some certificates.
...
Run: kubectl create -f kafka webhook.yaml to start the pod.
Note: You can delete the webhook once the connector pod starts.
Create the KafkaConnect object
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: connect namespace: kafka labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: my-connect-cluster namespace: kafka labels: webhook: "true" annotations: # use-connector-resources configures this KafkaConnect # to use KafkaConnector resources to avoid # needing to call the Connect REST API directly strimzi.io/use-connector-resources: "true" spec: image: ktimoney/strimzi-file-connect replicas: 1 bootstrapServers: my-cluster-kafka-bootstrap:9093 tls: trustedCertificates: - secretName: my-cluster-cluster-ca-cert certificate: ca.crt - secretName: my-cluster-clients-ca-cert certificate: ca.crt authentication: type: tls certificateAndKey: secretName: connect certificate: user.crt key: user.key config: group.id: connect-cluster offset.storage.topic: connect-cluster-offsets config.storage.topic: connect-cluster-configs status.storage.topic: connect-cluster-status config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 |
...
The image tag references the image we built earlier : ktimoney/strimzi-file-connectCreate the Sink connector
In the metadata section of KafkaConnect I have added a new label webhook: "true", this will be picked up by the MutatingWebhookConfiguration:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafkaadmissionregistration.k8s.io/v1 kind: MutatingWebhookConfiguration metadata: name: kafka-webhook-config namespace: kafka annotations: cert-manager.io/inject-ca-from-secret: kafka/kafka-webhook-cert webhooks: - name: kafka-webhook.kafka.svc.cluster.local admissionReviewVersions: - "v1" sideEffects: "None" timeoutSeconds: 30 objectSelector: matchLabels: webhook: "true" |
matchLabels: webhook: "true"
Create the Sink connector
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector namespace: kafka labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector tasksMax: 1 config: topics: my-topic value.converter: org.apache.kafka.connect.storage.StringConverter key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false camel.sink.endpoint.fileName: mydata-${date:now:ddMMyy-hhmmss-SSS}.txt camel.sink.path.directoryName: /opt/kafka/data camel.sink.endpoint.fileExist: Append camel.sink.endpoint.autoCreate: true camel.aggregation.disable: true |
...
How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling
Camel Kafka Connector Examples
...
Camel Kafka Connector Source Code
Using secrets in Kafka Connect configuration