...
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.convert.minio;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;
import org.apache.camel.Converter;
@Converter(generateLoader = true)
public final class CamelMinioConverters {
private CamelMinioConverters() {
}
@SuppressWarnings("deprecation")
@Converter
public static InputStream fromMapToInputStream(Map<String, Object> map) {
String json = "{}";
try {
json = new ObjectMapper().writeValueAsString(map);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
newIn.setBody(new ByteArrayInputStream(json.getBytes()));
return new ByteArrayInputStream(json.getBytes());
}
} |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.aggregate.minio; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import org.apache.camel.Message; public class StringAggregator implements AggregationStrategy { //@Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Message newIn = newExchange.getIn(); Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key"); String key = keyMap.get("id"); SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS"); String fileName = "KafkaExchangeKafka-" + key + "-" + sdf.format( new Date() ); newIn.setHeader("file", "Exchange" + fileName); // lets append the old body to the new body if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); if (body != null) { String newBody = newIn.getBody(String.class); if (newBody != null) { body += System.lineSeparator() + newBody; } newIn.setBody(body); } return newExchange; } } |
...
They key will be a json that loos like this: {"id":"119"}
Create the KafkaConnect object
...