Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We can then create a custom jar for our code and copy it into the docker container.

Note: You can also set the filename in the Kafka header using either CamelHeader.file or CamelHeader.ce-file


Create a docker file for this connector

...

Run: kubectl create -f kafka webhook.yaml to start the pod.

Note: You can delete the webhook once the connector pod starts.


Create the KafkaConnect object

Code Block
languagetext
titleKafkaConnect
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: connect
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: kafka  
  labels:
    webhook: "true"
  annotations:
  # use-connector-resources configures this KafkaConnect
  # to use KafkaConnector resources to avoid
  # needing to call the Connect REST API directly
    strimzi.io/use-connector-resources: "true"
spec:   
  image: ktimoney/strimzi-file-connect
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
      - secretName: my-cluster-clients-ca-cert
        certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      secretName: connect
      certificate: user.crt
      key: user.key
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1

...

The image tag references the image we built earlier : ktimoney/strimzi-file-connectCreate the Sink connector

In the metadata section of KafkaConnect I have added a new label webhook: "true", this will be picked up by the MutatingWebhookConfiguration:


Code Block
languagetextyml
titleKafkaConnectorMutatingWebhookConfiguration
apiVersion: kafkaadmissionregistration.strimzik8s.io/v1beta2v1
kind: MutatingWebhookConfiguration
metadata:
  name: kafka-webhook-config
  namespace: kafka
  annotations:
    cert-manager.io/inject-ca-from-secret: kafka/kafka-webhook-cert
webhooks:
  - name: kafka-webhook.kafka.svc.cluster.local
    admissionReviewVersions:
      - "v1"
    sideEffects: "None"
    timeoutSeconds: 30
    objectSelector:
      matchLabels:
        webhook: "true"

matchLabels: webhook: "true"


Create the Sink connector

Code Block
languagetext
titleKafkaConnector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:   
  class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector
  tasksMax: 1
  config:
    topics: my-topic 
    value.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false
    camel.sink.endpoint.fileName: mydata-${date:now:ddMMyy-hhmmss-SSS}.txt
    camel.sink.path.directoryName: /opt/kafka/data
    camel.sink.endpoint.fileExist: Append
    camel.sink.endpoint.autoCreate: true
    camel.aggregation.disable: true

...

How to Write a Connector for Kafka Connect – Deep Dive into Configuration Handling

Apache Camel

Camel Kafka Connector Examples

...

Camel Basic Configuration

Camel Components Source Code

Camel Kafka Connector Source Code

Using secrets in Kafka Connect configuration

Kafka Connect Transformations