...
We can then create a custom jar for our code and copy it into the docker container.
Note: You can also set the filename in the Kafka header using either CamelHeader.file or CamelHeader.ce-file
Create a docker file for this connector
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
name: connect
namespace: kafka
labels:
strimzi.io/cluster: my-cluster
spec:
authentication:
type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
namespace: kafka
labels:
webhook: "true"
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
image: ktimoney/strimzi-file-connect
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
- secretName: my-cluster-clients-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: connect
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1 |
...
The image tag references the image we built earlier : ktimoney/strimzi-file-connectCreate the Sink connector
In the metadata section of KafkaConnect I have added a new label webhook: "true", this will be picked up by the MutatingWebhookConfiguration:
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafkaadmissionregistration.strimzik8s.io/v1beta2v1 kind: KafkaConnectorMutatingWebhookConfiguration metadata: name: mykafka-sinkwebhook-connectorconfig namespace: kafka labelsannotations: cert-manager.io/inject-ca-from-secret: kafka/kafka-webhook-cert webhooks: - name: kafka-webhook.kafka.svc.cluster.local admissionReviewVersions: - "v1" sideEffects: "None" timeoutSeconds: 30 objectSelector: matchLabels: webhook: "true" |
matchLabels: webhook: "true"
Create the Sink connector
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector namespace: kafka labels: strimzi.io/strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector tasksMax: 1 config: topics: my-topic value.converter: org.apache.kafka.connect.storage.StringConverter key.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false camel.sink.endpoint.fileName: mydata-${date:now:ddMMyy-hhmmss-SSS}.txt camel.sink.path.directoryName: /opt/kafka/data camel.sink.endpoint.fileExist: Append camel.sink.endpoint.autoCreate: true camel.aggregation.disable: true |
...
How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling
Camel Kafka Connector Examples
...
Camel Kafka Connector Source Code
Using secrets in Kafka Connect configuration