...
Code Block |
---|
package org.oran.protobuf.demo; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.Struct; import com.google.protobuf.Struct.Builder; import com.google.protobuf.util.JsonFormat; import org.oran.protobuf.demo.pm.PMProtos.*; import io.apicurio.registry.serde.SerdeConfig; import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer; import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; import org.apache.commons.io.IOUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.oran.avro.demo.pm.PMFlat; public class PMDataProtobufExample { private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2"; private static final String BOOTSTRAP_HOST = "192.168.49.2"; private static final String BOOTSTRAP_PORT = "31809"; private static final String SERVERS = BOOTSTRAP_HOST+":"+BOOTSTRAP_PORT; private static final String PASSWORD = "Yx79VGQAWIMu"; private static final String KEYSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-keystore.jks"; private static final String TRUSTSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-truststore.jks"; private static final int NUMBER_OF_MESSAGES = 10; private static final String TOPIC_NAME = PMDataProtobufExample.class.getSimpleName(); private static final String SCHEMA_NAME = "PMData"; public static final void main(String [] args) throws Exception { PMDataProtobufExample app = new PMDataProtobufExample(); String jsonFile = "pm_report.json"; //String jsonFile = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json"; String jsonData = app.getFileAsString(jsonFile); PMDataType pm = fromJsonToClass(jsonData); System.out.println("Starting example " + PMDataProtobufExample.class.getSimpleName()); String topicName = TOPIC_NAME; String key = SCHEMA_NAME; // Create the producer. Producer<Object, PMDataType> producer = createKafkaProducer(); // Produce 2 messages. try { System.out.println("Producing (1) messages."); // Send/produce the message on the Kafka Producer ProducerRecord<Object, PMDataType> producedRecord = new ProducerRecord<>(topicName, key, pm); producer.send(producedRecord); System.out.println("Messages successfully produced."); } finally { System.out.println("Closing the producer."); producer.flush(); producer.close(); } // Create the consumer System.out.println("Creating the consumer."); KafkaConsumer<Long, PMDataType> consumer = createKafkaConsumer(); // Subscribe to the topic System.out.println("Subscribing to topic " + topicName); consumer.subscribe(Collections.singletonList(topicName)); // Consume the message. try { int messageCount = 0; System.out.println("Consuming (1) messages."); while (messageCount < 1) { final ConsumerRecords<Long, PMDataType> records = consumer.poll(Duration.ofSeconds(1)); messageCount += records.count(); if (records.count() == 0) { // Do nothing - no messages waiting. System.out.println("No messages waiting..."); } else records.forEach(record -> { PMDataType value = record.value(); System.out.println("Consumed a message: " ); // unmarshall the message List<PMFlat> flatList = transformPMS(value); flatList.forEach(System.out::println); }); } } finally { consumer.close(); } System.out.println("Done (success)."); } /** * Creates the Kafka producer. */ private static Producer<Object, PMDataType> createKafkaProducer() { Properties props = new Properties(); // Configure kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME); props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Use the Apicurio Registry provided Kafka Serializer for Protobuf props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufKafkaSerializer.class.getName()); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); props.putIfAbsent(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "default"); // Register the artifact if not found in the registry. props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); //Just if security values are present, then we configure them. configureSecurityIfPresent(props); // Create the Kafka producer Producer<Object, PMDataType> producer = new KafkaProducer<>(props); return producer; } /** * Creates the Kafka consumer. */ private static KafkaConsumer<Long, PMDataType> createKafkaConsumer() { Properties props = new Properties(); // Configure Kafka props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME); props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Use the Apicurio Registry provided Kafka Deserializer for Protobuf props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufKafkaDeserializer.class.getName()); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); //Just if security values are present, then we configure them. configureSecurityIfPresent(props); // Create the Kafka Consumer KafkaConsumer<Long, PMDataType> consumer = new KafkaConsumer<>(props); return consumer; } private static void configureSecurityIfPresent(Properties props) { props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUSTSTORE); props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD); // configure the following three settings for SSL Authentication props.putIfAbsent(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEYSTORE); props.putIfAbsent(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD); props.putIfAbsent(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD); } public static Message fromJson(String json) throws IOException { Builder structBuilder = Struct.newBuilder(); JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder); return structBuilder.build(); } @SuppressWarnings({"unchecked", "rawtypes"}) public static PMDataType fromJsonToClass(String json) { PMDataType.Builder pmBuilder = PMDataType.newBuilder(); try { JsonFormat.parser().merge(json, pmBuilder); } catch(InvalidProtocolBufferException e) { e.printStackTrace(); } PMDataType value = pmBuilder.build(); return value; } public String getFileAsString(String fileName)throws Exception { ClassLoader classLoader = getClass().getClassLoader(); InputStream inputStream = classLoader.getResourceAsStream(fileName); // the stream holding the file content if (inputStream == null) { throw new IllegalArgumentException("file not found! " + fileName); } else { String result = IOUtils.toString(inputStream, StandardCharsets.UTF_8); return result; } } public static List<PMFlat> transformPMS(PMDataType pms){ List<PMFlat> flatList = new ArrayList<>(); // Get the constant values Event event = pms.getEvent(); CommonEventHeader commonEventHeader = event.getCommonEventHeader(); String domain = commonEventHeader.getDomain(); String eventName = commonEventHeader.getEventName(); String eventNamesourceName = commonEventHeader.getEventNamegetSourceName(); String String sourceName reportingEntityName = commonEventHeader.getSourceNamegetReportingEntityName(); String reportingEntityNamelong startEpochMicrosec = commonEventHeader.getReportingEntityNamegetStartEpochMicrosec(); long startEpochMicroseclastEpochMicrosec = commonEventHeader.getStartEpochMicrosecgetLastEpochMicrosec(); long lastEpochMicrosec String timeZoneOffset = commonEventHeader.getLastEpochMicrosec(); String timeZoneOffset = commonEventHeader.getTimeZoneOffset(); Perf3gppFields perf3gppFields = event.getPerf3GppFields(); String perf3gppFieldsVersion = perf3gppFields.getPerf3GppFieldsVersion(); MeasDataCollection measDataCollection = perf3gppFields.getMeasDataCollection(); int granularityPeriod = measDataCollection.getGranularityPeriod(); String measuredEntityUserName = measDataCollection.getMeasuredEntityUserName(); String measuredEntityDn = measDataCollection.getMeasuredEntityDn(); String measuredEntitySoftwareVersion = measDataCollection.getMeasuredEntitySoftwareVersion(); List<MeasInfoList> measInfoLists = measDataCollection.getMeasInfoListList(); for(MeasInfoList measInfoList:measInfoLists) { MeasInfoId measInfoId = measInfoList.getMeasInfoId(); String sMeasInfoId = measInfoId.getSMeasInfoId(); MeasTypes measTypes = measInfoList.getMeasTypes(); List<String> sMeasTypesList = measTypes.getSMeasTypesListList(); List<MeasValuesList> measValuesLists = measInfoList.getMeasValuesListList(); for(MeasValuesList measValuesList:measValuesLists) { String measObjInstId = measValuesList.getMeasObjInstId(); String suspectFlag = measValuesList.getSuspectFlag(); List<MeasResults> measResults = measValuesList.getMeasResultsList(); for(MeasResults measResult : measResults) { // Create new PMSFlat object PMFlat flat = new PMFlat(); getTimeZoneOffset(); Perf3gppFields perf3gppFields = flatevent.setDomaingetPerf3GppFields(domain); String perf3gppFieldsVersion = perf3gppFields.getPerf3GppFieldsVersion(); MeasDataCollection measDataCollection = perf3gppFields.getMeasDataCollection(); int granularityPeriod = flatmeasDataCollection.setEventNamegetGranularityPeriod(eventName); String measuredEntityUserName = measDataCollection.getMeasuredEntityUserName(); String measuredEntityDn = measDataCollection.getMeasuredEntityDn(); String measuredEntitySoftwareVersion = flatmeasDataCollection.setSourceNamegetMeasuredEntitySoftwareVersion(sourceName); List<MeasInfoList> measInfoLists = measDataCollection.getMeasInfoListList(); for(MeasInfoList measInfoList:measInfoLists) { MeasInfoId measInfoId = flatmeasInfoList.setStartEpochMicrosecgetMeasInfoId(startEpochMicrosec); String sMeasInfoId = measInfoId.getSMeasInfoId(); MeasTypes measTypes flat.setLastEpochMicrosec(lastEpochMicrosec= measInfoList.getMeasTypes(); List<String> sMeasTypesList = measTypes.getSMeasTypesListList(); List<MeasValuesList> measValuesLists flat.setTimeZoneOffset(timeZoneOffset= measInfoList.getMeasValuesListList(); for(MeasValuesList measValuesList:measValuesLists) { String measObjInstId = flat.setGranularityPeriod(granularityPeriodmeasValuesList.getMeasObjInstId(); String suspectFlag = measValuesList.getSuspectFlag(); List<MeasResults> measResults = flatmeasValuesList.setMeasuredEntityDngetMeasResultsList(measuredEntityDn); for(MeasResults measResult : measResults) { flat.setMeasuredEntityUserName(measuredEntityUserName); // Create new PMSFlat object PMFlat flat = flat.setSMeasInfoId(sMeasInfoIdnew PMFlat(); flat.setDomain(domain); flat.setMeasObjInstIdsetEventName(measObjInstIdeventName); flat.setSourceName(sourceName); flat.setSuspectFlagsetStartEpochMicrosec(suspectFlagstartEpochMicrosec); flat.setLastEpochMicrosec(lastEpochMicrosec); String sMeasType = sMeasTypesList.get(measResult.getP()-1flat.setTimeZoneOffset(timeZoneOffset); flat.setGranularityPeriod(granularityPeriod); flat.setSMeasTypesetMeasuredEntityDn(sMeasTypemeasuredEntityDn); flat.setMeasuredEntityUserName(measuredEntityUserName); String sValue = measResult.getSValue(flat.setSMeasInfoId(sMeasInfoId); flat.setMeasObjInstId(measObjInstId); flat.setSValuesetSuspectFlag(sValuesuspectFlag); String sMeasType = sMeasTypesList.get(measResult.getP()-1); // add the object to the list flat.setSMeasType(sMeasType); String sValue = measResult.getSValue(); flatListflat.addsetSValue(flatsValue); // add the object to the list } flatList.add(flat); } } } return flatList; } } |
Links
...