Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Download the minio sink connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-miniofile-sink-kafka-connector/3.1920.0/camel-minio-sinkfile-kafka-connector-3.1920.0-package.tar.gzCreate a docker file for this connector


We need to create some custom classes:

CamelMinioConverters.java - this will allow us to deserialize to json and convert the hashmap to an inputstream

Code Block
languagetextjava
titleDockerCamelMinioConverters
FROMpackage com.custom.convert.minio;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;
import org.apache.camel.Converter;

@Converter(generateLoader = true)
public final class CamelMinioConverters {

    private CamelMinioConverters() {
    }

    @SuppressWarnings("deprecation")
    @Converter
    public static InputStream fromMapToInputStream(Map<String, Object> map) {

      String json = "{}";
      try {
        json = new ObjectMapper().writeValueAsString(map);
      } catch (JsonProcessingException e) {
        e.printStackTrace();
      }
     newIn.setBody(new ByteArrayInputStream(json.getBytes()));
     return new ByteArrayInputStream(json.getBytes());
    }

}


StringAggregator.java - this will allow us to aggregate the messages and set the file name

Code Block
languagejava
titleStringAggregator
package com.custom.aggregate.minio;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;

public class StringAggregator implements AggregationStrategy {

    //@Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
      Message newIn = newExchange.getIn();
      Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key");
      String key = keyMap.get("id");
      SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS");
      String fileName = "Kafka-" + key + "-" + sdf.format( new Date() );
      newIn.setHeader("file", "Exchange" + fileName);

        // lets append the old body to the new body
        if (oldExchange == null) {
            return newExchange;
        }

        String body = oldExchange.getIn().getBody(String.class);
        if (body != null) {
            String newBody = newIn.getBody(String.class);
            if (newBody != null) {
                body += System.lineSeparator() + newBody;
            }

            newIn.setBody(body);
        }
        return newExchange;
    }
}

We can then create a custom jar for our code and copy it into the docker container.


Create a docker file for this connector


Code Block
languagetext
titleDocker
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER quay.io/strimzi/kafka:0.32.0-kafka-3.3.1
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel
COPY ./camel-minio-sink-kafka-connector-3.1920.0-package.tar.gz /opt/kafka/plugins/camel/
RUN tar -xvzf /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.19.0-package.tar.gz --directory.20.0-package.tar.gz --directory /opt/kafka/plugins/camel
COPY ./custom-converter-1.0.0.jar /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector/
RUN rm /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.1920.0-package.tar.gzgzRUN rm -rf /opt/kafka/plugins/camel/docs
USER 1001

Build the image and copy it to your docker repository

...

Code Block
languagejs
titleSchema & Payload
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
}

With the type converter n place we can also deserialize as a json object.

They key will be a json that loos like this: {"id":"

...

119"

...

} 


Create the KafkaConnect object

...

Code Block
languagetext
titleKafkaConnector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-sink-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:   
  class: org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector   
  tasksMax: 1   
  config:
    topics: my-topic
    key.converter: org.apache.kafka.connect.storagejson.StringConverterJsonConverter
     value.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false         
    camel.beans.aggregate: "#class:com.custom.aggregate.minio.StringAggregator"
    camel.aggregation.size: 1
    camel.aggregation.timeout: 20000
    camel.aggregation.disable: false   key.ignore:  true
    camel.kamelet.minio-sink.bucketName: camel
    camel.kamelet.minio-sink.accessKey: ybK7RleFUkdDeYBf
    camel.kamelet.minio-sink.secretKey: X0Y4zK84bwdefRTljrnPzOb1l0A6OQj2
    camel.kamelet.minio-sink.endpoint: http://minio.default:9000
    camel.kamelet.minio-sink.autoCreateBucket: true

...

The value converter are set to "org.apache.kafka.connect.storage.StringConverter".

We will need to create our own TypeConverter and include it with the camel connector if we want to deserialize using org.apache.kafka.connect.json.JsonConverter.

.

The minio connection details are specified in camel.kamelet.minio-sink parameters.

The aggregation size is set to 1 so it won't aggregate any of the files

This timeout is also specified so it will only aggregate records arriving within this time out value (20 seconds).

These values can be adjusted depending on your requirements.

The main purpose of the aggregator in this example is for setting the file name, this is done by setting a header with a key of "file"The minio connection details are specified in camel.kamelet.minio-sink parameters.

The yaml for the minio pod is available here: minio.yaml

...

When the connector is running it copies the kafka records into minio as files:

Here's a screen shot where we are using the aggregator to set the file name:

Image Added

Links

Kafka Connect And Schemas

...