package com.custom.aggregate.minio;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
public class StringAggregatorCustomStringAggregator implements AggregationStrategy {
//@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key");
String key = keyMap.get("id");
SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS");
String fileName = "ExchangeKafka-" + key + "-" + sdf.format( new Date() );
newIn.setHeader("file", fileName);
// lets append the old body to the new body
if (oldExchange == null) {
return newExchange;
}
String body = oldExchange.getIn().getBody(String.class);
if (body != null) {
String newBody = newIn.getBody(String.class);
if (newBody != null) {
body += System.lineSeparator() + newBody;
}
newIn.setBody(body);
}
return newExchange;
}
} |