Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
titlePM Schema
{
   "type":"record",
   "name":"valuePMData",
   "namespace":"org.oran.avro.demo.pm",
    "fields":[
      {
         "name":"event",
         "type":{
            "type":"record",
            "name":"Event",
            "fields":[
               {
                  "name":"commonEventHeader",
                  "type":{
                     "type":"record",
                     "name":"CommonEventHeader",
                     "fields":[
                        {
                           "name":"domain",
                           "type":"string"
                        },
                        {
                           "name": "eventId",
                           "type":"string"
                        },                       
                        {
                           "name":"eventName",
                           "type":"string"
                        },
                        {
                           "name":"sequence",
                           "type": "int"                         
                        },                        
                        {
                           "name":"reportingEntityName",
                           "type":"string"
                        },
                        {
                           "name":"vesEventListenerVersion",
                           "type":"string"
                        },
                        {
                           "name":"version",
                           "type":"string"
                        },                                                
                        {
                           "name":"sourceName",
                           "type":"string"
                        },
                        {
                           "name":"priority",
                           "type":"string"
                        },                        
                        {
                           "name":"lastEpochMicrosec",
                           "type":"long"
                        },
                        {
                           "name":"startEpochMicrosec",
                           "type":"long"
                        },
                        {
                           "name":"timeZoneOffset",
                           "type":"string"
                        }
                     ]
                  }
               },
               {
                  "name":"perf3gppFields",
                  "type":{
                     "type":"record",
                     "name":"Perf3gppFields",
                     "fields":[
                        {
                           "name":"perf3gppFieldsVersion",
                           "type":"string"
                        },
                        {
                           "name":"measDataCollection",
                           "type":{
                              "type":"record",
                              "name":"MeasDataCollection",
                              "fields":[
                                 {
                                    "name":"granularityPeriod",
                                    "type":"int"
                                 },
                                 {
                                    "name":"measuredEntityDn",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measuredEntitySoftwareVersion",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measuredEntityUserName",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measInfoList",
                                    "type":{
                                       "type":"array",
                                       "items":{
                                          "type":"record",
                                          "name":"MeasInfoList",
                                          "fields":[
                                             {
                                                "name":"measInfoId",
                                                "type":{
                                                   "type":"record",
                                                   "name":"MeasInfoId",
                                                   "fields":[
                                                      {
                                                         "name":"sMeasInfoId",
                                                         "type":"string"
                                                      }
                                                   ]
                                                }
                                             },
                                             {
                                                "name":"measTypes",
                                                "type":{
                                                   "type":"record",
                                                   "name":"MeasTypes",
                                                   "fields":[
                                                      {
                                                         "name":"sMeasTypesList",
                                                         "type":{
                                                            "type":"array",
                                                            "items":"string"
                                                         }
                                                      }
                                                   ]
                                                }
                                             },
                                             {
                                                "name":"measValuesList",
                                                "type":{
                                                   "type":"array",
                                                   "items":{
                                                      "type":"record",
                                                      "name":"MeasValuesList",
                                                      "fields":[
                                                         {
                                                            "name":"measObjInstId",
                                                            "type":"string"
                                                         },
                                                         {
                                                            "name":"suspectFlag",
                                                            "type":"string"
                                                         },                                                         
                                                         {
                                                            "name":"measResults",
                                                            "type":{
                                                               "type":"array",
                                                               "items":{
                                                                  "type":"record",
                                                                  "name":"MeasResult",
                                                                  "fields":[
                                                                     {
                                                                        "name":"p",
                                                                        "type":"int"
                                                                     },
                                                                     {
                                                                        "name":"sValue",
                                                                        "type":"string"
                                                                     }
                                                                  ]
                                                               }
                                                            }
                                                         }
                                                      ]
                                                   }
                                                }
                                             }
                                          ]
                                       }
                                    }
                                 }
                              ]
                           }
                        }
                     ]
                  }
               }
            ]
         }
      }
   ]
}

Links

Apache Avro - a data serialization system

Apicurio Registry

Note: Every field in the AVRO schema is mandatory. Please remove any fields not included in the JSON file.

AVRO Code Generation

You can generate java classes from your schema using th avro-tools jar.

e.g.

Code Block
languagejava
titleCode Generation
java -jar /mnt/c/Users/ktimoney/Downloads/avro-tools-1.11.1.jar  compile schema ./src/main/resources/schemas/pm-all.avsc ./src/main/java/

This will produce the following classes in the org/oran/avro/demo/pm/ directory (same as namespace in the schema)

Note: We need to add some extra annotation to some of the fields in the generated classes

  @JsonProperty("sMeasInfoId")
  private java.lang.CharSequence sMeasInfoId;

  @JsonProperty("sMeasTypesList")
  private java.util.List<java.lang.CharSequence> sMeasTypesList;

  @JsonProperty("sValue")
  private java.lang.CharSequence sValue;

It has problems using fields starting with a lowercase character followed by an uppercase character.


CommonEventHeader.java  

MeasDataCollection.java  

MeasInfoList.java  

MeasTypes.java      

Perf3gppFields.java  

Event.java              

MeasInfoId.java          

MeasResult.java    

MeasValuesList.java  

PMData.java

JAVA Producer/Consumer


Code Block
languagejava
titleAVRO Producer/Consumer
package org.oran.avro.demo;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.apicurio.registry.rest.v2.beans.IfExists;
import io.apicurio.registry.serde.SerdeConfig;
import io.apicurio.registry.serde.avro.AvroKafkaDeserializer;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.oran.avro.demo.pm.MeasInfoList;
import org.oran.avro.demo.pm.MeasResult;
import org.oran.avro.demo.pm.MeasTypes;
import org.oran.avro.demo.pm.MeasValuesList;
import org.oran.avro.demo.pm.PMData;
import org.oran.avro.demo.pm.PMFlat;


public class PMDataAvroExample {

    private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2";
    private static final String CCOMPAT_API_URL = "http://localhost:8080/apis/ccompat/v6";
    private static final String BOOTSTRAP_HOST = "192.168.49.2";
    private static final String BOOTSTRAP_PORT = "30053";
    private static final String SERVERS = BOOTSTRAP_HOST+":"+BOOTSTRAP_PORT;
    private static final String TOPIC_NAME = PMDataAvroExample.class.getSimpleName();
    private static final String SUBJECT_NAME = "key";
    private static final String PASSWORD = "GkELEyyxccrp";
    private static final String KEYSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-keystore.jks";
    private static final String TRUSTSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-truststore.jks";
    private static final int NUMBER_OF_MESSAGES = 10;


    public static final void main(String [] args) throws Exception {
      PMDataAvroExample app = new PMDataAvroExample();
      //String jsonFile =  "pm_report.json";
      String jsonFile =  "A20000626.2315+0200-2330+0200_HTTPS-6-73.json";
      String jsonData = app.getFileAsString(jsonFile);


        System.out.println("Starting example " + PMDataAvroExample.class.getSimpleName());
        String topicName = TOPIC_NAME;
        String subjectName = SUBJECT_NAME;
        String artifactId = topicName + "-PMData";

        String schemaData = app.getFileAsString("schemas/pm-all.avsc");

        // Create the producer.
        Producer<String, Object> producer = createKafkaProducer();
        int producedMessages = 0;
        // Produce messages
        try {
            System.out.println("Producing ("+NUMBER_OF_MESSAGES+") messages.");
            for (int idx = 0; idx < NUMBER_OF_MESSAGES; idx++) {
                // Use the schema to create a record
                GenericRecord record = parseJson(jsonData, schemaData);

                // Send/produce the message on the Kafka Producer
                ProducerRecord<String, Object> producedRecord = new ProducerRecord<>(topicName, subjectName, record);
                producer.send(producedRecord);

                Thread.sleep(100);
            }
            System.out.println(NUMBER_OF_MESSAGES + " messages sent successfully.");
        } finally {
            System.out.println("Closing the producer.");
            producer.flush();
            producer.close();
        }

        // Create the consumer
        System.out.println("Creating the consumer.");
        KafkaConsumer<Long, GenericRecord> consumer = createKafkaConsumer();

        // Subscribe to the topic
        System.out.println("Subscribing to topic " + topicName);
        consumer.subscribe(Collections.singletonList(topicName));

        // Consume messages.
        try {
            int messageCount = 0;
            System.out.println("Consuming ("+NUMBER_OF_MESSAGES+") messages.");
            while (messageCount < NUMBER_OF_MESSAGES) {
                final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.ofSeconds(1));
                messageCount += records.count();
                if (records.count() == 0) {
                    // Wait for messages to become available.
                    System.out.println("Waiting for messages...");
                } else records.forEach(record -> {
                    GenericRecord recordValue = record.value();
                    System.out.println("Consumed a message: ");
                    ObjectMapper mapper = new ObjectMapper();
                    try {
                      PMData pms =  mapper.readValue(String.valueOf(record.value()), PMData.class);
                      List<PMFlat> flatList = transformPMS(pms);
                      flatList.forEach(System.out::println);
                    } catch (Exception e) {
                      e.printStackTrace();
                    }

                });
            }
        } finally {
            System.out.println("Closing the consumer.");
            consumer.close();
        }

        System.out.println("Done (success).");
    }

    /**
     * Creates the Kafka producer.
     */
    private static Producer<String, Object> createKafkaProducer() {
        Properties props = new Properties();

        // Configure kafka settings
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME);
        props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");

        // Configure Service Registry location
        props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
        // Get an existing schema - do not auto-register the schema if not found.
        props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE);
        props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT_IF_EXISTS, IfExists.RETURN.name());

        //Just if security values are present, then we configure them.
        configureSecurityIfPresent(props);

        RestService restService = new RestService(CCOMPAT_API_URL);
        final Map<String, String> restServiceProperties = new HashMap<>();
        CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, restServiceProperties);

        Map<String, String> properties = new HashMap<>();

        // Configure Service Registry location (Confluent API)
        properties.put("schema.registry.url", CCOMPAT_API_URL);
        properties.put("auto.register.schemas", "true");
        // Map the topic name to the artifactId in the registry
        properties.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");

        // Use the Confluent provided Kafka Serializer for Avro
        KafkaAvroSerializer valueSerializer = new KafkaAvroSerializer(schemaRegistryClient, properties);
        StringSerializer keySerializer = new StringSerializer();

        // Create the Kafka producer
        Producer<String, Object> producer = new KafkaProducer<String, Object>(props, keySerializer, valueSerializer);

        return producer;
    }

    /**
     * Creates the Kafka consumer.
     */
    private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() {
        Properties props = new Properties();

        // Configure Kafka
        props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
        props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME);
        props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // Use the Apicurio Registry provided Kafka Deserializer for Avro
        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName());

        // Configure Service Registry location
        props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
        // Enable "Confluent Compatible API" mode in the Apicurio Registry deserializer
        props.putIfAbsent(SerdeConfig.ENABLE_CONFLUENT_ID_HANDLER, Boolean.TRUE);

        //Just if security values are present, then we configure them.
        configureSecurityIfPresent(props);

        // Create the Kafka Consumer
        KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props);
        return consumer;
    }


    private static void configureSecurityIfPresent(Properties props) {
      props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
      props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUSTSTORE);
      props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,  PASSWORD);

      // configure the following three settings for SSL Authentication
      props.putIfAbsent(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEYSTORE);
      props.putIfAbsent(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD);
      props.putIfAbsent(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD);
    }


    private static GenericData.Record parseJson(String json, String schema) throws IOException {
      Schema parsedSchema = new Schema.Parser().parse(schema);
      Decoder decoder = DecoderFactory.get().jsonDecoder(parsedSchema, json);

      DatumReader<GenericData.Record> reader =
          new GenericDatumReader<>(parsedSchema);
      return reader.read(null, decoder);
    }

    public String getFileAsString(String fileName)throws Exception
    {
      ClassLoader classLoader = getClass().getClassLoader();
      InputStream inputStream = classLoader.getResourceAsStream(fileName);

      // the stream holding the file content
      if (inputStream == null) {
          throw new IllegalArgumentException("file not found! " + fileName);
      } else {
          String result = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
          return result;
      }
    }


    public static List<PMFlat> transformPMS(PMData pms){
      List<PMFlat> flatList = new ArrayList<>();
      // Get the constant values
      String domain = pms.getEvent().getCommonEventHeader().getDomain().toString();
      String eventName = pms.getEvent().getCommonEventHeader().getEventName().toString();
      String sourceName = pms.getEvent().getCommonEventHeader().getSourceName().toString();
      long startEpochMicrosec = pms.getEvent().getCommonEventHeader().getStartEpochMicrosec();
      long lastEpochMicrosec = pms.getEvent().getCommonEventHeader().getLastEpochMicrosec();
      String timeZoneOffset = pms.getEvent().getCommonEventHeader().getTimeZoneOffset().toString();
      int granularityPeriod = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getGranularityPeriod();
      String measuredEntityDn = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityDn().toString();
      String measuredEntityUserName = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityUserName().toString();

      List<MeasInfoList> measInfoLists = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasInfoList();
      // Loop through the measurements
      for(MeasInfoList measInfoList: measInfoLists) {
        String sMeasInfoId = measInfoList.getMeasInfoId().getSMeasInfoId().toString();
        MeasTypes measTypes = measInfoList.getMeasTypes();
        List<CharSequence> sMeasTypesList = measTypes.getSMeasTypesList();
        List<MeasValuesList> measValuesLists = measInfoList.getMeasValuesList();
        for(MeasValuesList measValuesList: measValuesLists) {
          String measObjInstId = measValuesList.getMeasObjInstId().toString();
          String suspectFlag = measValuesList.getSuspectFlag().toString();
          List<MeasResult> measResultList = measValuesList.getMeasResults();
          for(MeasResult measResult: measResultList) {
            // Create new PMSFlat object
            PMFlat flat = new PMFlat();
            flat.setDomain(domain);
            flat.setEventName(eventName);
            flat.setSourceName(sourceName);
            flat.setStartEpochMicrosec(startEpochMicrosec);
            flat.setLastEpochMicrosec(lastEpochMicrosec);
            flat.setTimeZoneOffset(timeZoneOffset);
            flat.setGranularityPeriod(granularityPeriod);
            flat.setMeasuredEntityDn(measuredEntityDn);
            flat.setMeasuredEntityUserName(measuredEntityUserName);
            flat.setSMeasInfoId(sMeasInfoId);
            flat.setMeasObjInstId(measObjInstId);
            flat.setSuspectFlag(suspectFlag);

            String sMeasType = sMeasTypesList.get(measResult.getP()-1).toString();
            flat.setSMeasType(sMeasType);
            String sValue = measResult.getSValue().toString();
            flat.setSValue(sValue);
            // add the object to the list
            flatList.add(flat);
          }
        }
      }
      return flatList;
    }

}

Note: Extra configuration is required to make the the message(s) compatible with the confluent format (CCOMPAT_API_URL).

We unmarshall the message to the generated code classes and then map to the PMFlat class.

Code Block
package org.oran.avro.demo.pm;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
public class PMFlat {
  private String domain;
  private String eventName;
  private String sourceName;
  private long startEpochMicrosec;
  private long lastEpochMicrosec;
  private String timeZoneOffset;
  private int granularityPeriod;
  private String measuredEntityDn;
  private String measuredEntityUserName;
  private String sMeasInfoId;
  private String measObjInstId;
  private String suspectFlag;
  private String sMeasType;
  private String sValue;
}

Links

Apache Avro - a data serialization system

Apicurio Registry

AVRO SCHEMA TOOLS

Example applications using the Apicurio RegistryAVRO SCHEMA TOOLS