Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
package org.oran.protobuf.demo;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.Struct;
import com.google.protobuf.Struct.Builder;
import com.google.protobuf.util.JsonFormat;
import org.oran.protobuf.demo.pm.PMProtos.*;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.protobuf.ProtobufKafkaDeserializer;
import io.apicurio.registry.serde.protobuf.ProtobufKafkaSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.oran.avro.demo.pm.PMFlat;


public class PMDataProtobufExample {

    private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2";
    private static final String BOOTSTRAP_HOST = "192.168.49.2";
    private static final String BOOTSTRAP_PORT = "31809";
    private static final String SERVERS = BOOTSTRAP_HOST+":"+BOOTSTRAP_PORT;
    private static final String PASSWORD = "Yx79VGQAWIMu";
    private static final String KEYSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-keystore.jks";
    private static final String TRUSTSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-truststore.jks";
    private static final int NUMBER_OF_MESSAGES = 10;
    private static final String TOPIC_NAME = PMDataProtobufExample.class.getSimpleName();
    private static final String SCHEMA_NAME = "PMData";


    public static final void main(String [] args) throws Exception {
      PMDataProtobufExample app = new PMDataProtobufExample();
      String jsonFile = "pm_report.json";
      //String jsonFile =  "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
      String jsonData = app.getFileAsString(jsonFile);
      PMDataType pm = fromJsonToClass(jsonData);


        System.out.println("Starting example " + PMDataProtobufExample.class.getSimpleName());
        String topicName = TOPIC_NAME;
        String key = SCHEMA_NAME;

        // Create the producer.
        Producer<Object, PMDataType> producer = createKafkaProducer();
        // Produce 2 messages.
        try {
                System.out.println("Producing (1) messages.");
                // Send/produce the message on the Kafka Producer
                ProducerRecord<Object, PMDataType> producedRecord = new ProducerRecord<>(topicName, key, pm);
                producer.send(producedRecord);
            System.out.println("Messages successfully produced.");
        } finally {
            System.out.println("Closing the producer.");
            producer.flush();
            producer.close();
        }

        // Create the consumer
        System.out.println("Creating the consumer.");
        KafkaConsumer<Long, PMDataType> consumer = createKafkaConsumer();

        // Subscribe to the topic
        System.out.println("Subscribing to topic " + topicName);
        consumer.subscribe(Collections.singletonList(topicName));

        // Consume the message.
        try {
            int messageCount = 0;
            System.out.println("Consuming (1) messages.");
            while (messageCount < 1) {
                final ConsumerRecords<Long, PMDataType> records = consumer.poll(Duration.ofSeconds(1));
                messageCount += records.count();
                if (records.count() == 0) {
                    // Do nothing - no messages waiting.
                    System.out.println("No messages waiting...");
                } else records.forEach(record -> {
                    PMDataType value = record.value();
                    System.out.println("Consumed a message:  " );
                    // unmarshall the message
                    List<PMFlat> flatList = transformPMS(value);
                    flatList.forEach(System.out::println);
                });
            }
        } finally {
            consumer.close();
        }

        System.out.println("Done (success).");
    }

    /**
     * Creates the Kafka producer.
     */
    private static Producer<Object, PMDataType> createKafkaProducer() {
        Properties props = new Properties();

        // Configure kafka settings
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
        props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
        props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // Use the Apicurio Registry provided Kafka Serializer for Protobuf
        props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufKafkaSerializer.class.getName());

        // Configure Service Registry location
        props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
        props.putIfAbsent(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, "default");

        // Register the artifact if not found in the registry.
        props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);

        //Just if security values are present, then we configure them.
        configureSecurityIfPresent(props);

        // Create the Kafka producer
        Producer<Object, PMDataType> producer = new KafkaProducer<>(props);
        return producer;
    }

    /**
     * Creates the Kafka consumer.
     */
    private static KafkaConsumer<Long, PMDataType> createKafkaConsumer() {
        Properties props = new Properties();

        // Configure Kafka
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
        props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Use the Apicurio Registry provided Kafka Deserializer for Protobuf
        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ProtobufKafkaDeserializer.class.getName());

        // Configure Service Registry location
        props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);

        //Just if security values are present, then we configure them.
        configureSecurityIfPresent(props);

        // Create the Kafka Consumer
        KafkaConsumer<Long, PMDataType> consumer = new KafkaConsumer<>(props);
        return consumer;
    }

    private static void configureSecurityIfPresent(Properties props) {
      props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
      props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUSTSTORE);
      props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);

      // configure the following three settings for SSL Authentication
      props.putIfAbsent(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEYSTORE);
      props.putIfAbsent(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD);
      props.putIfAbsent(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD);
    }

    public static Message fromJson(String json) throws IOException {
      Builder structBuilder = Struct.newBuilder();
      JsonFormat.parser().ignoringUnknownFields().merge(json, structBuilder);
      return structBuilder.build();
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    public static PMDataType fromJsonToClass(String json) {
      PMDataType.Builder pmBuilder = PMDataType.newBuilder();
      try {
           JsonFormat.parser().merge(json, pmBuilder);
      } catch(InvalidProtocolBufferException e) {
        e.printStackTrace();
      }
      PMDataType value = pmBuilder.build();
      return value;
    }

    public String getFileAsString(String fileName)throws Exception
    {
      ClassLoader classLoader = getClass().getClassLoader();
      InputStream inputStream = classLoader.getResourceAsStream(fileName);

      // the stream holding the file content
      if (inputStream == null) {
          throw new IllegalArgumentException("file not found! " + fileName);
      } else {
          String result = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
          return result;
      }
    }

    public static List<PMFlat> transformPMS(PMDataType pms){
      List<PMFlat> flatList = new ArrayList<>();
      // Get the constant values
                    Event event = pms.getEvent();
                    CommonEventHeader commonEventHeader = event.getCommonEventHeader();
                    String domain = commonEventHeader.getDomain();
      String eventName    = commonEventHeader.getEventName();
         String eventNamesourceName = commonEventHeader.getEventNamegetSourceName();
      String              String sourceName reportingEntityName = commonEventHeader.getSourceNamegetReportingEntityName();
                    String reportingEntityNamelong startEpochMicrosec = commonEventHeader.getReportingEntityNamegetStartEpochMicrosec();
                    long startEpochMicroseclastEpochMicrosec = commonEventHeader.getStartEpochMicrosecgetLastEpochMicrosec();
                    long lastEpochMicrosec String timeZoneOffset = commonEventHeader.getLastEpochMicrosec();
                    String timeZoneOffset = commonEventHeader.getTimeZoneOffset();
                    Perf3gppFields perf3gppFields = event.getPerf3GppFields();
                    String perf3gppFieldsVersion = perf3gppFields.getPerf3GppFieldsVersion();
                    MeasDataCollection measDataCollection = perf3gppFields.getMeasDataCollection();
                    int granularityPeriod = measDataCollection.getGranularityPeriod();
                    String measuredEntityUserName = measDataCollection.getMeasuredEntityUserName();
                    String measuredEntityDn = measDataCollection.getMeasuredEntityDn();
                    String measuredEntitySoftwareVersion = measDataCollection.getMeasuredEntitySoftwareVersion();
                    List<MeasInfoList> measInfoLists = measDataCollection.getMeasInfoListList();
                    for(MeasInfoList measInfoList:measInfoLists) {
                      MeasInfoId measInfoId = measInfoList.getMeasInfoId();
                      String sMeasInfoId = measInfoId.getSMeasInfoId();
                      MeasTypes measTypes = measInfoList.getMeasTypes();
                      List<String> sMeasTypesList = measTypes.getSMeasTypesListList();
                      List<MeasValuesList> measValuesLists = measInfoList.getMeasValuesListList();
                      for(MeasValuesList measValuesList:measValuesLists) {
                          String measObjInstId = measValuesList.getMeasObjInstId();
                          String suspectFlag = measValuesList.getSuspectFlag();
                          List<MeasResults> measResults = measValuesList.getMeasResultsList();
                          for(MeasResults measResult : measResults) {
                            // Create new PMSFlat object
                            PMFlat flat = new PMFlat();
                  getTimeZoneOffset();
      Perf3gppFields perf3gppFields =  flatevent.setDomaingetPerf3GppFields(domain);
      String perf3gppFieldsVersion = perf3gppFields.getPerf3GppFieldsVersion();
      MeasDataCollection measDataCollection = perf3gppFields.getMeasDataCollection();
      int granularityPeriod =  flatmeasDataCollection.setEventNamegetGranularityPeriod(eventName);
      String measuredEntityUserName = measDataCollection.getMeasuredEntityUserName();
      String measuredEntityDn = measDataCollection.getMeasuredEntityDn();
      String measuredEntitySoftwareVersion =  flatmeasDataCollection.setSourceNamegetMeasuredEntitySoftwareVersion(sourceName);
      List<MeasInfoList> measInfoLists = measDataCollection.getMeasInfoListList();
      for(MeasInfoList measInfoList:measInfoLists) {
        MeasInfoId measInfoId = flatmeasInfoList.setStartEpochMicrosecgetMeasInfoId(startEpochMicrosec);
        String sMeasInfoId = measInfoId.getSMeasInfoId();
        MeasTypes measTypes        flat.setLastEpochMicrosec(lastEpochMicrosec= measInfoList.getMeasTypes();
        List<String> sMeasTypesList = measTypes.getSMeasTypesListList();
        List<MeasValuesList> measValuesLists        flat.setTimeZoneOffset(timeZoneOffset= measInfoList.getMeasValuesListList();
        for(MeasValuesList measValuesList:measValuesLists) {
            String measObjInstId =    flat.setGranularityPeriod(granularityPeriodmeasValuesList.getMeasObjInstId();
            String suspectFlag = measValuesList.getSuspectFlag();
            List<MeasResults> measResults = flatmeasValuesList.setMeasuredEntityDngetMeasResultsList(measuredEntityDn);
            for(MeasResults measResult : measResults) {
            flat.setMeasuredEntityUserName(measuredEntityUserName);
  // Create new PMSFlat object
              PMFlat flat =      flat.setSMeasInfoId(sMeasInfoIdnew PMFlat();
              flat.setDomain(domain);
              flat.setMeasObjInstIdsetEventName(measObjInstIdeventName);
                 flat.setSourceName(sourceName);
               flat.setSuspectFlagsetStartEpochMicrosec(suspectFlagstartEpochMicrosec);
              flat.setLastEpochMicrosec(lastEpochMicrosec);
              String sMeasType = sMeasTypesList.get(measResult.getP()-1flat.setTimeZoneOffset(timeZoneOffset);
              flat.setGranularityPeriod(granularityPeriod);
              flat.setSMeasTypesetMeasuredEntityDn(sMeasTypemeasuredEntityDn);
              flat.setMeasuredEntityUserName(measuredEntityUserName);
              String sValue = measResult.getSValue(flat.setSMeasInfoId(sMeasInfoId);
              flat.setMeasObjInstId(measObjInstId);
              flat.setSValuesetSuspectFlag(sValuesuspectFlag);
              String sMeasType = sMeasTypesList.get(measResult.getP()-1);
           // add the object to the list
 flat.setSMeasType(sMeasType);
              String sValue = measResult.getSValue();
              flatListflat.addsetSValue(flatsValue);
              // add the object to the  list
     }
          flatList.add(flat);
            }
           }
         }
      return flatList;
     }

}


Links

Release H: AVRO

PMProtos.java

...