...
Download the minio sink connector: wget https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-miniofile-sink-kafka-connector/3.1920.0/camel-minio-sinkfile-kafka-connector-3.1920.0-package.tar.gzCreate a docker file for this connector
We need to create some custom classes:
CamelMinioConverters.java - this will allow us to deserialize to json and convert the hashmap to an inputstream
Code Block | ||||
---|---|---|---|---|
| ||||
FROMpackage com.custom.convert.minio; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.Map; import org.apache.camel.Converter; @Converter(generateLoader = true) public final class CamelMinioConverters { private CamelMinioConverters() { } @SuppressWarnings("deprecation") @Converter public static InputStream fromMapToInputStream(Map<String, Object> map) { String json = "{}"; try { json = new ObjectMapper().writeValueAsString(map); } catch (JsonProcessingException e) { e.printStackTrace(); } newIn.setBody(new ByteArrayInputStream(json.getBytes())); return new ByteArrayInputStream(json.getBytes()); } } |
StringAggregator.java - this will allow us to aggregate the messages and set the file name
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.aggregate.minio;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
public class StringAggregator implements AggregationStrategy {
//@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key");
String key = keyMap.get("id");
SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS");
String fileName = "Kafka-" + key + "-" + sdf.format( new Date() );
newIn.setHeader("file", "Exchange" + fileName);
// lets append the old body to the new body
if (oldExchange == null) {
return newExchange;
}
String body = oldExchange.getIn().getBody(String.class);
if (body != null) {
String newBody = newIn.getBody(String.class);
if (newBody != null) {
body += System.lineSeparator() + newBody;
}
newIn.setBody(body);
}
return newExchange;
}
} |
We can then create a custom jar for our code and copy it into the docker container.
Create a docker file for this connector
Code Block | ||||
---|---|---|---|---|
| ||||
FROM quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 USER quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 USER root:root RUN mkdir -p /opt/kafka/plugins/camel COPY ./camel-minio-sink-kafka-connector-3.1920.0-package.tar.gz /opt/kafka/plugins/camel/ RUN tar -xvzf /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.19.0-package.tar.gz --directory.20.0-package.tar.gz --directory /opt/kafka/plugins/camel COPY ./custom-converter-1.0.0.jar /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector/ RUN rm /opt/kafka/plugins/camel/camel-minio-sink-kafka-connector-3.1920.0-package.tar.gzgzRUN rm -rf /opt/kafka/plugins/camel/docs USER 1001 |
Build the image and copy it to your docker repository
...
Code Block | ||||
---|---|---|---|---|
| ||||
{
"camelInfluxDB.MeasurementName":"disk_space"
"time":"2023-01-23T13:03:31Z"
"host":"localhost"
"region":"IE-region"
"used":"19%"
"free":"81%"
} |
With the type converter n place we can also deserialize as a json object.
They key will be a json that loos like this: {"id":"
...
119"
...
}
Create the KafkaConnect object
...
Code Block | ||||
---|---|---|---|---|
| ||||
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-sink-connector namespace: kafka labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.miniosink.CamelMiniosinkSinkConnector tasksMax: 1 config: topics: my-topic key.converter: org.apache.kafka.connect.storagejson.StringConverterJsonConverter value.converter: org.apache.kafka.connect.storage.StringConverter key.converter.schemas.enable: false value.converter.schemas.enable: false camel.beans.aggregate: "#class:com.custom.aggregate.minio.StringAggregator" camel.aggregation.size: 1 camel.aggregation.timeout: 20000 camel.aggregation.disable: false key.ignore: true camel.kamelet.minio-sink.bucketName: camel camel.kamelet.minio-sink.accessKey: ybK7RleFUkdDeYBf camel.kamelet.minio-sink.secretKey: X0Y4zK84bwdefRTljrnPzOb1l0A6OQj2 camel.kamelet.minio-sink.endpoint: http://minio.default:9000 camel.kamelet.minio-sink.autoCreateBucket: true |
...
The value converter are set to "org.apache.kafka.connect.storage.StringConverter".
We will need to create our own TypeConverter and include it with the camel connector if we want to deserialize using org.apache.kafka.connect.json.JsonConverter.
.
The minio connection details are specified in camel.kamelet.minio-sink parameters.
The aggregation size is set to 1 so it won't aggregate any of the files
This timeout is also specified so it will only aggregate records arriving within this time out value (20 seconds).
These values can be adjusted depending on your requirements.
The main purpose of the aggregator in this example is for setting the file name, this is done by setting a header with a key of "file"The minio connection details are specified in camel.kamelet.minio-sink parameters.
The yaml for the minio pod is available here: minio.yaml
...
When the connector is running it copies the kafka records into minio as files:
Here's a screen shot where we are using the aggregator to set the file name:
Links
...