Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This will produce the following classes in the org/oran/avro/demo/pm/ directory (same as namespace in the schema)

CommonEventHeader.java  

MeasDataCollection.java  

MeasInfoList.java  

MeasTypes.java      

Perf3gppFields.java  

Event.java              

MeasInfoId.java          

MeasResult.java    

MeasValuesList.java  

PMData.java

Note: We need to add some extra annotation to some of the fields in the generated classes

...

It has problems using fields starting with a lowercase character followed by an uppercase character.CommonEventHeader.java  

MeasDataCollection.java  

MeasInfoList.java  

MeasTypes.java      

Perf3gppFields.java  

Event.java              

MeasInfoId.java          

MeasResult.java    

MeasValuesList.java  

PMData.java

JAVA Producer/Consumer


Code Block
languagejava
titleAVRO Producer/Consumer
package org.oran.avro.demo;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.apicurio.registry.rest.v2.beans.IfExists;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.oran.avro.demo.pm.MeasInfoList;
import org.oran.avro.demo.pm.MeasResult;
import org.oran.avro.demo.pm.MeasTypes;
import org.oran.avro.demo.pm.MeasValuesList;
import org.oran.avro.demo.pm.PMData;
import org.oran.avro.demo.pm.PMFlat;


public class PMDataAvroExample {

    private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2";
    private static final String CCOMPAT_API_URL = "http://localhost:8080/apis/ccompat/v6";
    private static final String BOOTSTRAP_HOST = "192.168.49.2";
    private static final String BOOTSTRAP_PORT = "30053";
    private static final String SERVERS = BOOTSTRAP_HOST+":"+BOOTSTRAP_PORT;
    private static final String TOPIC_NAME = PMDataAvroExample.class.getSimpleName();
    private static final String SUBJECT_NAME = "key";
    private static final String PASSWORD = "GkELEyyxccrp";
    private static final String KEYSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-keystore.jks";
    private static final String TRUSTSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-truststore.jks";
    private static final int NUMBER_OF_MESSAGES = 10;


    public static final void main(String [] args) throws Exception {
      PMDataAvroExample app = new PMDataAvroExample();
      //String jsonFile =  "pm_report.json";
      String jsonFile =  "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
      String jsonData = app.getFileAsString(jsonFile);


        System.out.println("Starting example " + PMDataAvroExample.class.getSimpleName());
        String topicName = TOPIC_NAME;
        String subjectName = SUBJECT_NAME;
        String artifactId = topicName + "-PMData";

        String schemaData = app.getFileAsString("schemas/pm-all.avsc");

        // Create the producer.
        Producer<String, Object> producer = createKafkaProducer();
        int producedMessages = 0;
        // Produce messages
        try {
            System.out.println("Producing ("+NUMBER_OF_MESSAGES+") messages.");
            for (int idx = 0; idx < NUMBER_OF_MESSAGES; idx++) {
                // Use the schema to create a record
                GenericRecord record = parseJson(jsonData, schemaData);

                // Send/produce the message on the Kafka Producer
                ProducerRecord<String, Object> producedRecord = new ProducerRecord<>(topicName, subjectName, record);
                producer.send(producedRecord);

                Thread.sleep(100);
            }
            System.out.println(NUMBER_OF_MESSAGES + " messages sent successfully.");
        } finally {
            System.out.println("Closing the producer.");
            producer.flush();
            producer.close();
        }

        // Create the consumer
        System.out.println("Creating the consumer.");
        KafkaConsumer<Long, GenericRecord> consumer = createKafkaConsumer();

        // Subscribe to the topic
        System.out.println("Subscribing to topic " + topicName);
        consumer.subscribe(Collections.singletonList(topicName));

        // Consume messages.
        try {
            int messageCount = 0;
            System.out.println("Consuming ("+NUMBER_OF_MESSAGES+") messages.");
            while (messageCount < NUMBER_OF_MESSAGES) {
                final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.ofSeconds(1));
                messageCount += records.count();
                if (records.count() == 0) {
                    // Wait for messages to become available.
                    System.out.println("Waiting for messages...");
                } else records.forEach(record -> {
                    GenericRecord recordValue = record.value();
                    System.out.println("Consumed a message: ");
                    ObjectMapper mapper = new ObjectMapper();
                    try {
                      PMData pms =  mapper.readValue(String.valueOf(record.value()), PMData.class);
                      List<PMFlat> flatList = transformPMS(pms);
                      flatList.forEach(System.out::println);
                    } catch (Exception e) {
                      e.printStackTrace();
                    }

                });
            }
        } finally {
            System.out.println("Closing the consumer.");
            consumer.close();
        }

        System.out.println("Done (success).");
    }

    /**
     * Creates the Kafka producer.
     */
    private static Producer<String, Object> createKafkaProducer() {
        Properties props = new Properties();

        // Configure kafka settings
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
        props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

        // Configure Service Registry location
        props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
        // Get an existing schema - do not auto-register the schema if not found.
        props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
        props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT_IF_EXISTS, IfExists.RETURN.name());

        //Just if security values are present, then we configure them.
        configureSecurityIfPresent(props);

        RestService restService = new RestService(CCOMPAT_API_URL);
        final Map<String, String> restServiceProperties = new HashMap<>();
        CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, restServiceProperties);

        Map<String, String> properties = new HashMap<>();

        // Configure Service Registry location (Confluent API)
        properties.put("schema.registry.url", CCOMPAT_API_URL);
        properties.put("auto.register.schemas", "true");
        // Map the topic name to the artifactId in the registry
        properties.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");

        // Use the Confluent provided Kafka Serializer for Avro
        KafkaAvroSerializer valueSerializer = new KafkaAvroSerializer(schemaRegistryClient, properties);
        StringSerializer keySerializer = new StringSerializer();

        // Create the Kafka producer
        Producer<String, Object> producer = new KafkaProducer<String, Object>(props, keySerializer, valueSerializer);

        return producer;
    }

    /**
     * Creates the Kafka consumer.
     */
    private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
        Properties props = new Properties();

        // Configure Kafka
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
        props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Use the Apicurio Registry provided Kafka Deserializer for Avro
        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

        // Configure Service Registry location
        props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
        // Enable "Confluent Compatible API" mode in the Apicurio Registry deserializer
        props.putIfAbsent(SerdeConfig.ENABLE_CONFLUENT_ID_HANDLER, Boolean.TRUE);

        //Just if security values are present, then we configure them.
        configureSecurityIfPresent(props);

        // Create the Kafka Consumer
        KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
        return consumer;
    }


    private static void configureSecurityIfPresent(Properties props) {
      props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
      props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUSTSTORE);
      props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);

      // configure the following three settings for SSL Authentication
      props.putIfAbsent(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEYSTORE);
      props.putIfAbsent(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD);
      props.putIfAbsent(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD);
    }


    private static GenericData.Record parseJson(String json, String schema) throws IOException {
      Schema parsedSchema = new Schema.Parser().parse(schema);
      Decoder decoder = DecoderFactory.get().jsonDecoder(parsedSchema, json);

      DatumReader<GenericData.Record> reader =
          new GenericDatumReader<>(parsedSchema);
      return reader.read(null, decoder);
    }

    public String getFileAsString(String fileName)throws Exception
    {
      ClassLoader classLoader = getClass().getClassLoader();
      InputStream inputStream = classLoader.getResourceAsStream(fileName);

      // the stream holding the file content
      if (inputStream == null) {
          throw new IllegalArgumentException("file not found! " + fileName);
      } else {
          String result = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
          return result;
      }
    }


    public static List<PMFlat> transformPMS(PMData pms){
      List<PMFlat> flatList = new ArrayList<>();
      // Get the constant values
      String domain = pms.getEvent().getCommonEventHeader().getDomain().toString();
      String eventName = pms.getEvent().getCommonEventHeader().getEventName().toString();
      String sourceName = pms.getEvent().getCommonEventHeader().getSourceName().toString();
      long startEpochMicrosec = pms.getEvent().getCommonEventHeader().getStartEpochMicrosec();
      long lastEpochMicrosec = pms.getEvent().getCommonEventHeader().getLastEpochMicrosec();
      String timeZoneOffset = pms.getEvent().getCommonEventHeader().getTimeZoneOffset().toString();
      int granularityPeriod = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getGranularityPeriod();
      String measuredEntityDn = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityDn().toString();
      String measuredEntityUserName = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityUserName().toString();

      List<MeasInfoList> measInfoLists = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasInfoList();
      // Loop through the measurements
      for(MeasInfoList measInfoList: measInfoLists) {
        String sMeasInfoId = measInfoList.getMeasInfoId().getSMeasInfoId().toString();
        MeasTypes measTypes = measInfoList.getMeasTypes();
        List<CharSequence> sMeasTypesList = measTypes.getSMeasTypesList();
        List<MeasValuesList> measValuesLists = measInfoList.getMeasValuesList();
        for(MeasValuesList measValuesList: measValuesLists) {
          String measObjInstId = measValuesList.getMeasObjInstId().toString();
          String suspectFlag = measValuesList.getSuspectFlag().toString();
          List<MeasResult> measResultList = measValuesList.getMeasResults();
          for(MeasResult measResult: measResultList) {
            // Create new PMSFlat object
            PMFlat flat = new PMFlat();
            flat.setDomain(domain);
            flat.setEventName(eventName);
            flat.setSourceName(sourceName);
            flat.setStartEpochMicrosec(startEpochMicrosec);
            flat.setLastEpochMicrosec(lastEpochMicrosec);
            flat.setTimeZoneOffset(timeZoneOffset);
            flat.setGranularityPeriod(granularityPeriod);
            flat.setMeasuredEntityDn(measuredEntityDn);
            flat.setMeasuredEntityUserName(measuredEntityUserName);
            flat.setSMeasInfoId(sMeasInfoId);
            flat.setMeasObjInstId(measObjInstId);
            flat.setSuspectFlag(suspectFlag);

            String sMeasType = sMeasTypesList.get(measResult.getP()-1).toString();
            flat.setSMeasType(sMeasType);
            String sValue = measResult.getSValue().toString();
            flat.setSValue(sValue);
            // add the object to the list
            flatList.add(flat);
          }
        }
      }
      return flatList;
    }

}

...