...
This will produce the following classes in the org/oran/avro/demo/pm/ directory (same as namespace in the schema)
CommonEventHeader.java
MeasDataCollection.java
MeasInfoList.java
MeasTypes.java
Perf3gppFields.java
Event.java
MeasInfoId.java
MeasResult.java
MeasValuesList.java
PMData.java
Note: We need to add some extra annotation to some of the fields in the generated classes
...
It has problems using fields starting with a lowercase character followed by an uppercase character.CommonEventHeader.java
MeasDataCollection.java
MeasInfoList.java
MeasTypes.java
Perf3gppFields.java
Event.java
MeasInfoId.java
MeasResult.java
MeasValuesList.java
PMData.java
JAVA Producer/Consumer
Code Block | ||||
---|---|---|---|---|
| ||||
package org.oran.avro.demo; import com.fasterxml.jackson.databind.ObjectMapper; import io.apicurio.registry.rest.v2.beans.IfExists; import io.apicurio.registry.serde.SerdeConfig; import io.apicurio.registry.serde.avro.AvroKafkaDeserializer; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.RestService; import io.confluent.kafka.serializers.KafkaAvroSerializer; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.commons.io.IOUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.oran.avro.demo.pm.MeasInfoList; import org.oran.avro.demo.pm.MeasResult; import org.oran.avro.demo.pm.MeasTypes; import org.oran.avro.demo.pm.MeasValuesList; import org.oran.avro.demo.pm.PMData; import org.oran.avro.demo.pm.PMFlat; public class PMDataAvroExample { private static final String REGISTRY_URL = "http://localhost:8080/apis/registry/v2"; private static final String CCOMPAT_API_URL = "http://localhost:8080/apis/ccompat/v6"; private static final String BOOTSTRAP_HOST = "192.168.49.2"; private static final String BOOTSTRAP_PORT = "30053"; private static final String SERVERS = BOOTSTRAP_HOST+":"+BOOTSTRAP_PORT; private static final String TOPIC_NAME = PMDataAvroExample.class.getSimpleName(); private static final String SUBJECT_NAME = "key"; private static final String PASSWORD = "GkELEyyxccrp"; private static final String KEYSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-keystore.jks"; private static final String TRUSTSTORE = "/mnt/c/Users/ktimoney/go/kafka/users/user-truststore.jks"; private static final int NUMBER_OF_MESSAGES = 10; public static final void main(String [] args) throws Exception { PMDataAvroExample app = new PMDataAvroExample(); //String jsonFile = "pm_report.json"; String jsonFile = "A20000626.2315+0200-2330+0200_HTTPS-6-73.json"; String jsonData = app.getFileAsString(jsonFile); System.out.println("Starting example " + PMDataAvroExample.class.getSimpleName()); String topicName = TOPIC_NAME; String subjectName = SUBJECT_NAME; String artifactId = topicName + "-PMData"; String schemaData = app.getFileAsString("schemas/pm-all.avsc"); // Create the producer. Producer<String, Object> producer = createKafkaProducer(); int producedMessages = 0; // Produce messages try { System.out.println("Producing ("+NUMBER_OF_MESSAGES+") messages."); for (int idx = 0; idx < NUMBER_OF_MESSAGES; idx++) { // Use the schema to create a record GenericRecord record = parseJson(jsonData, schemaData); // Send/produce the message on the Kafka Producer ProducerRecord<String, Object> producedRecord = new ProducerRecord<>(topicName, subjectName, record); producer.send(producedRecord); Thread.sleep(100); } System.out.println(NUMBER_OF_MESSAGES + " messages sent successfully."); } finally { System.out.println("Closing the producer."); producer.flush(); producer.close(); } // Create the consumer System.out.println("Creating the consumer."); KafkaConsumer<Long, GenericRecord> consumer = createKafkaConsumer(); // Subscribe to the topic System.out.println("Subscribing to topic " + topicName); consumer.subscribe(Collections.singletonList(topicName)); // Consume messages. try { int messageCount = 0; System.out.println("Consuming ("+NUMBER_OF_MESSAGES+") messages."); while (messageCount < NUMBER_OF_MESSAGES) { final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.ofSeconds(1)); messageCount += records.count(); if (records.count() == 0) { // Wait for messages to become available. System.out.println("Waiting for messages..."); } else records.forEach(record -> { GenericRecord recordValue = record.value(); System.out.println("Consumed a message: "); ObjectMapper mapper = new ObjectMapper(); try { PMData pms = mapper.readValue(String.valueOf(record.value()), PMData.class); List<PMFlat> flatList = transformPMS(pms); flatList.forEach(System.out::println); } catch (Exception e) { e.printStackTrace(); } }); } } finally { System.out.println("Closing the consumer."); consumer.close(); } System.out.println("Done (success)."); } /** * Creates the Kafka producer. */ private static Producer<String, Object> createKafkaProducer() { Properties props = new Properties(); // Configure kafka settings props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + TOPIC_NAME); props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all"); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); // Get an existing schema - do not auto-register the schema if not found. props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); props.putIfAbsent(SerdeConfig.AUTO_REGISTER_ARTIFACT_IF_EXISTS, IfExists.RETURN.name()); //Just if security values are present, then we configure them. configureSecurityIfPresent(props); RestService restService = new RestService(CCOMPAT_API_URL); final Map<String, String> restServiceProperties = new HashMap<>(); CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(restService, 100, restServiceProperties); Map<String, String> properties = new HashMap<>(); // Configure Service Registry location (Confluent API) properties.put("schema.registry.url", CCOMPAT_API_URL); properties.put("auto.register.schemas", "true"); // Map the topic name to the artifactId in the registry properties.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy"); // Use the Confluent provided Kafka Serializer for Avro KafkaAvroSerializer valueSerializer = new KafkaAvroSerializer(schemaRegistryClient, properties); StringSerializer keySerializer = new StringSerializer(); // Create the Kafka producer Producer<String, Object> producer = new KafkaProducer<String, Object>(props, keySerializer, valueSerializer); return producer; } /** * Creates the Kafka consumer. */ private static KafkaConsumer<Long, GenericRecord> createKafkaConsumer() { Properties props = new Properties(); // Configure Kafka props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS); props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "Consumer-" + TOPIC_NAME); props.putIfAbsent(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.putIfAbsent(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Use the Apicurio Registry provided Kafka Deserializer for Avro props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AvroKafkaDeserializer.class.getName()); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); // Enable "Confluent Compatible API" mode in the Apicurio Registry deserializer props.putIfAbsent(SerdeConfig.ENABLE_CONFLUENT_ID_HANDLER, Boolean.TRUE); //Just if security values are present, then we configure them. configureSecurityIfPresent(props); // Create the Kafka Consumer KafkaConsumer<Long, GenericRecord> consumer = new KafkaConsumer<>(props); return consumer; } private static void configureSecurityIfPresent(Properties props) { props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, TRUSTSTORE); props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, PASSWORD); // configure the following three settings for SSL Authentication props.putIfAbsent(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KEYSTORE); props.putIfAbsent(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, PASSWORD); props.putIfAbsent(SslConfigs.SSL_KEY_PASSWORD_CONFIG, PASSWORD); } private static GenericData.Record parseJson(String json, String schema) throws IOException { Schema parsedSchema = new Schema.Parser().parse(schema); Decoder decoder = DecoderFactory.get().jsonDecoder(parsedSchema, json); DatumReader<GenericData.Record> reader = new GenericDatumReader<>(parsedSchema); return reader.read(null, decoder); } public String getFileAsString(String fileName)throws Exception { ClassLoader classLoader = getClass().getClassLoader(); InputStream inputStream = classLoader.getResourceAsStream(fileName); // the stream holding the file content if (inputStream == null) { throw new IllegalArgumentException("file not found! " + fileName); } else { String result = IOUtils.toString(inputStream, StandardCharsets.UTF_8); return result; } } public static List<PMFlat> transformPMS(PMData pms){ List<PMFlat> flatList = new ArrayList<>(); // Get the constant values String domain = pms.getEvent().getCommonEventHeader().getDomain().toString(); String eventName = pms.getEvent().getCommonEventHeader().getEventName().toString(); String sourceName = pms.getEvent().getCommonEventHeader().getSourceName().toString(); long startEpochMicrosec = pms.getEvent().getCommonEventHeader().getStartEpochMicrosec(); long lastEpochMicrosec = pms.getEvent().getCommonEventHeader().getLastEpochMicrosec(); String timeZoneOffset = pms.getEvent().getCommonEventHeader().getTimeZoneOffset().toString(); int granularityPeriod = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getGranularityPeriod(); String measuredEntityDn = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityDn().toString(); String measuredEntityUserName = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityUserName().toString(); List<MeasInfoList> measInfoLists = pms.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasInfoList(); // Loop through the measurements for(MeasInfoList measInfoList: measInfoLists) { String sMeasInfoId = measInfoList.getMeasInfoId().getSMeasInfoId().toString(); MeasTypes measTypes = measInfoList.getMeasTypes(); List<CharSequence> sMeasTypesList = measTypes.getSMeasTypesList(); List<MeasValuesList> measValuesLists = measInfoList.getMeasValuesList(); for(MeasValuesList measValuesList: measValuesLists) { String measObjInstId = measValuesList.getMeasObjInstId().toString(); String suspectFlag = measValuesList.getSuspectFlag().toString(); List<MeasResult> measResultList = measValuesList.getMeasResults(); for(MeasResult measResult: measResultList) { // Create new PMSFlat object PMFlat flat = new PMFlat(); flat.setDomain(domain); flat.setEventName(eventName); flat.setSourceName(sourceName); flat.setStartEpochMicrosec(startEpochMicrosec); flat.setLastEpochMicrosec(lastEpochMicrosec); flat.setTimeZoneOffset(timeZoneOffset); flat.setGranularityPeriod(granularityPeriod); flat.setMeasuredEntityDn(measuredEntityDn); flat.setMeasuredEntityUserName(measuredEntityUserName); flat.setSMeasInfoId(sMeasInfoId); flat.setMeasObjInstId(measObjInstId); flat.setSuspectFlag(suspectFlag); String sMeasType = sMeasTypesList.get(measResult.getP()-1).toString(); flat.setSMeasType(sMeasType); String sValue = measResult.getSValue().toString(); flat.setSValue(sValue); // add the object to the list flatList.add(flat); } } } return flatList; } } |
...