...
Code Block | ||||
---|---|---|---|---|
| ||||
package com.custom.aggregate.minio; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; import org.apache.camel.Message; public class CustomStringAggregator implements AggregationStrategy { //@Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Message newIn = newExchange.getIn(); Map<String, String> keyMap = (HashMap) newIn.getHeaders().get("camel.kafka.connector.record.key"); String key = keyMap.get("id"); SimpleDateFormat sdf = new SimpleDateFormat("ddMMyy-hhmmss-SSS"); String fileName = "ExchangeKafka-" + key + "-" + sdf.format( new Date() ) + ".json"; newIn.setHeader("file", fileName); // lets append the old body to the new body if (oldExchange == null) { return newExchange; } String body = oldExchange.getIn().getBody(String.class); if (body != null) { String newBody = newIn.getBody(String.class); if (newBody != null) { body += System.lineSeparator() + newBody; } newIn.setBody(body); } return newExchange; } } |
...