Introduction
Apache Avro is a data serialization system that enables storing JSON files with a schema definition to be stored in binary format.
This increases the efficiency of message throughput/storage in Kafka.
In order to use it we need to use a schema registry like Apicurio.
Apicurio
Apicurio Registry is a schema registry and an API registry, which stores and retrieves event schemas and API designs.
To run it with Strimzi over SSL please we use the following service definition:
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaUser metadata: name: kafka-registry namespace: kafka labels: strimzi.io/cluster: my-cluster spec: authentication: type: tls authorization: type: simple acls: # Group Id to consume information for the different topics used by the Service Registry. # Name equals to metadata.name property in ApicurioRegistry object - resource: type: group name: service-registry operation: Read # Rules for the Global global-id-topic - resource: type: topic name: global-id-topic operation: Read - resource: type: topic name: global-id-topic operation: Describe - resource: type: topic name: global-id-topic operation: Write - resource: type: topic name: global-id-topic operation: Create # Rules for the Global storage-topic - resource: type: topic name: storage-topic operation: Read - resource: type: topic name: storage-topic operation: Describe - resource: type: topic name: storage-topic operation: Write - resource: type: topic name: storage-topic operation: Create # Rules for the local topics created by our Service Registry instance # Prefix value equals to metadata.name property in ApicurioRegistry object - resource: type: topic name: service-registry- patternType: prefix operation: Read - resource: type: topic name: service-registry- patternType: prefix operation: Describe - resource: type: topic name: service-registry- patternType: prefix operation: Write - resource: type: topic name: service-registry- patternType: prefix operation: Create # Rules for the local transactionalsIds created by our Service Registry instance # Prefix equals to metadata.name property in ApicurioRegistry object - resource: type: transactionalId name: service-registry- patternType: prefix operation: Describe - resource: type: transactionalId name: service-registry- patternType: prefix operation: Write # Rules for internal Apache Kafka topics - resource: type: topic name: __consumer_offsets operation: Read - resource: type: topic name: __transaction_state operation: Read # Rules for Cluster objects - resource: type: cluster operation: IdempotentWrite --- apiVersion: v1 kind: Service metadata: name: kafka-registry namespace: kafka labels: run: kafka-registry spec: type: LoadBalancer selector: run: kafka-registry ports: - port: 8080 targetPort: 8080 protocol: TCP name: http nodePort: 31808 --- apiVersion: apps/v1 kind: Deployment metadata: name: kafka-registry namespace: kafka labels: run: kafka-registry spec: selector: matchLabels: run: kafka-registry template: metadata: labels: run: kafka-registry spec: containers: - name: kafka-registry image: apicurio/apicurio-registry-mem:2.4.2.Final imagePullPolicy: IfNotPresent env: - name : QUARKUS_PROFILE value: prod - name : KAFKA_BOOTSTRAP_SERVERS value: my-cluster-kafka-bootstrap:9093 - name : APPLICATION_SERVER_HOST value: kafka-registry - name : APPLICATION_SERVER_PORT value: "8080" - name : APPLICATION_ID value: strimzi-apicurioregistry - name : REGISTRY_PROPERTIES_PREFIX value: REGISTRY - name : REGISTRY_STREAMS_TOPOLOGY_SECURITY_PROTOCOL value: SSL - name: REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_TYPE value: PKCS12 - name : REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_LOCATION value: /etc/ssl/keystore/keystore.p12 - name: REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_PASSWORD valueFrom: secretKeyRef: name: kafka-registry key: user.password - name: REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_TYPE value: PKCS12 - name : REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_LOCATION value: /etc/ssl/truststore/truststore.p12 - name: REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_PASSWORD valueFrom: secretKeyRef: name: my-cluster-cluster-ca-cert key: ca.password ports: - name: http containerPort: 8080 volumeMounts: - name: keystore mountPath: "/etc/ssl/keystore" readOnly: true - name: truststore mountPath: "/etc/ssl/truststore" readOnly: true volumes: - name: keystore secret: secretName: kafka-registry items: - key: user.p12 path: keystore.p12 - name: truststore secret: secretName: my-cluster-cluster-ca-cert items: - key: ca.p12 path: truststore.p12 restartPolicy: Always
AVRO Schema
There are various tools available to help us define our AVRO schema (see link below).
Here is the schema definition for the PM data file:
{ "type":"record", "name":"value", "fields":[ { "name":"event", "type":{ "type":"record", "name":"Event", "fields":[ { "name":"commonEventHeader", "type":{ "type":"record", "name":"CommonEventHeader", "fields":[ { "name":"domain", "type":"string" }, { "name": "eventId", "type":"string" }, { "name":"eventName", "type":"string" }, { "name":"sequence", "type": "int" }, { "name":"reportingEntityName", "type":"string" }, { "name":"vesEventListenerVersion", "type":"string" }, { "name":"version", "type":"string" }, { "name":"sourceName", "type":"string" }, { "name":"priority", "type":"string" }, { "name":"lastEpochMicrosec", "type":"long" }, { "name":"startEpochMicrosec", "type":"long" }, { "name":"timeZoneOffset", "type":"string" } ] } }, { "name":"perf3gppFields", "type":{ "type":"record", "name":"Perf3gppFields", "fields":[ { "name":"perf3gppFieldsVersion", "type":"string" }, { "name":"measDataCollection", "type":{ "type":"record", "name":"MeasDataCollection", "fields":[ { "name":"granularityPeriod", "type":"int" }, { "name":"measuredEntityDn", "type":"string" }, { "name":"measuredEntitySoftwareVersion", "type":"string" }, { "name":"measuredEntityUserName", "type":"string" }, { "name":"measInfoList", "type":{ "type":"array", "items":{ "type":"record", "name":"MeasInfoList", "fields":[ { "name":"measInfoId", "type":{ "type":"record", "name":"MeasInfoId", "fields":[ { "name":"sMeasInfoId", "type":"string" } ] } }, { "name":"measTypes", "type":{ "type":"record", "name":"MeasTypes", "fields":[ { "name":"sMeasTypesList", "type":{ "type":"array", "items":"string" } } ] } }, { "name":"measValuesList", "type":{ "type":"array", "items":{ "type":"record", "name":"MeasValuesList", "fields":[ { "name":"measObjInstId", "type":"string" }, { "name":"suspectFlag", "type":"string" }, { "name":"measResults", "type":{ "type":"array", "items":{ "type":"record", "name":"MeasResult", "fields":[ { "name":"p", "type":"int" }, { "name":"sValue", "type":"string" } ] } } } ] } } } ] } } } ] } } ] } } ] } } ] }