You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Introduction

Apache Avro is a data serialization system that enables storing JSON files with a schema definition to be stored in binary format.

This increases the efficiency of message throughput/storage in Kafka.

In order to use it we need to use a schema registry like Apicurio.

Apicurio

Apicurio Registry is a schema registry and an API registry, which stores and retrieves event schemas and API designs.

To run it with Strimzi over SSL please we use the following service definition:

Apicurio
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
  name: kafka-registry
  namespace: kafka
  labels:
    strimzi.io/cluster: my-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
      # Group Id to consume information for the different topics used by the Service Registry.
      # Name equals to metadata.name property in ApicurioRegistry object
      - resource:
          type: group
          name: service-registry
        operation: Read
      # Rules for the Global global-id-topic
      - resource:
          type: topic
          name: global-id-topic
        operation: Read
      - resource:
          type: topic
          name: global-id-topic
        operation: Describe
      - resource:
          type: topic
          name: global-id-topic
        operation: Write
      - resource:
          type: topic
          name: global-id-topic
        operation: Create
      # Rules for the Global storage-topic
      - resource:
          type: topic
          name: storage-topic
        operation: Read
      - resource:
          type: topic
          name: storage-topic
        operation: Describe
      - resource:
          type: topic
          name: storage-topic
        operation: Write
      - resource:
          type: topic
          name: storage-topic
        operation: Create
      # Rules for the local topics created by our Service Registry instance
      # Prefix value equals to metadata.name property in ApicurioRegistry object
      - resource:
          type: topic
          name: service-registry-
          patternType: prefix
        operation: Read
      - resource:
          type: topic
          name: service-registry-
          patternType: prefix
        operation: Describe
      - resource:
          type: topic
          name: service-registry-
          patternType: prefix
        operation: Write
      - resource:
          type: topic
          name: service-registry-
          patternType: prefix
        operation: Create
      # Rules for the local transactionalsIds created by our Service Registry instance
      # Prefix equals to metadata.name property in ApicurioRegistry object
      - resource:
          type: transactionalId
          name: service-registry-
          patternType: prefix
        operation: Describe
      - resource:
          type: transactionalId
          name: service-registry-
          patternType: prefix
        operation: Write
      # Rules for internal Apache Kafka topics
      - resource:
          type: topic
          name: __consumer_offsets
        operation: Read
      - resource:
          type: topic
          name: __transaction_state
        operation: Read
      # Rules for Cluster objects
      - resource:
          type: cluster
        operation: IdempotentWrite
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-registry
  namespace: kafka
  labels:
    run: kafka-registry
spec:
  type: LoadBalancer
  selector:
    run: kafka-registry
  ports:
  - port: 8080
    targetPort: 8080
    protocol: TCP
    name: http
    nodePort: 31808
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-registry
  namespace: kafka
  labels:
    run: kafka-registry
spec:
  selector:
    matchLabels:
      run: kafka-registry
  template:
    metadata:
      labels:
        run: kafka-registry
    spec:
      containers:
      - name: kafka-registry
        image: apicurio/apicurio-registry-mem:2.4.2.Final
        imagePullPolicy: IfNotPresent
        env:
        - name : QUARKUS_PROFILE
          value: prod
        - name : KAFKA_BOOTSTRAP_SERVERS
          value: my-cluster-kafka-bootstrap:9093
        - name : APPLICATION_SERVER_HOST
          value: kafka-registry
        - name : APPLICATION_SERVER_PORT
          value: "8080"
        - name : APPLICATION_ID
          value: strimzi-apicurioregistry
        - name : REGISTRY_PROPERTIES_PREFIX
          value: REGISTRY
        - name : REGISTRY_STREAMS_TOPOLOGY_SECURITY_PROTOCOL
          value: SSL
        - name:  REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_TYPE
          value: PKCS12
        - name : REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_LOCATION
          value: /etc/ssl/keystore/keystore.p12
        - name: REGISTRY_STREAMS_TOPOLOGY_SSL_KEYSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: kafka-registry
              key: user.password
        - name:  REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_TYPE
          value: PKCS12
        - name : REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_LOCATION
          value: /etc/ssl/truststore/truststore.p12
        - name: REGISTRY_STREAMS_TOPOLOGY_SSL_TRUSTSTORE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: my-cluster-cluster-ca-cert
              key: ca.password
        ports:
        - name: http
          containerPort: 8080
        volumeMounts:
        - name: keystore
          mountPath: "/etc/ssl/keystore"
          readOnly: true
        - name: truststore
          mountPath: "/etc/ssl/truststore"
          readOnly: true
      volumes:
      - name: keystore
        secret:
          secretName: kafka-registry
          items:
          - key: user.p12
            path: keystore.p12
      - name: truststore
        secret:
          secretName: my-cluster-cluster-ca-cert
          items:
          - key: ca.p12
            path: truststore.p12
      restartPolicy: Always


AVRO Schema

There are various tools available to help us define our AVRO schema (see link below).

Here is the schema definition for the PM data file:

PM Schema
{
   "type":"record",
   "name":"value",
   "fields":[
      {
         "name":"event",
         "type":{
            "type":"record",
            "name":"Event",
            "fields":[
               {
                  "name":"commonEventHeader",
                  "type":{
                     "type":"record",
                     "name":"CommonEventHeader",
                     "fields":[
                        {
                           "name":"domain",
                           "type":"string"
                        },
                        {
                           "name": "eventId",
                           "type":"string"
                        },                       
                        {
                           "name":"eventName",
                           "type":"string"
                        },
                        {
                           "name":"sequence",
                           "type": "int"                         
                        },                        
                        {
                           "name":"reportingEntityName",
                           "type":"string"
                        },
                        {
                           "name":"vesEventListenerVersion",
                           "type":"string"
                        },
                        {
                           "name":"version",
                           "type":"string"
                        },                                                
                        {
                           "name":"sourceName",
                           "type":"string"
                        },
                        {
                           "name":"priority",
                           "type":"string"
                        },                        
                        {
                           "name":"lastEpochMicrosec",
                           "type":"long"
                        },
                        {
                           "name":"startEpochMicrosec",
                           "type":"long"
                        },
                        {
                           "name":"timeZoneOffset",
                           "type":"string"
                        }
                     ]
                  }
               },
               {
                  "name":"perf3gppFields",
                  "type":{
                     "type":"record",
                     "name":"Perf3gppFields",
                     "fields":[
                        {
                           "name":"perf3gppFieldsVersion",
                           "type":"string"
                        },
                        {
                           "name":"measDataCollection",
                           "type":{
                              "type":"record",
                              "name":"MeasDataCollection",
                              "fields":[
                                 {
                                    "name":"granularityPeriod",
                                    "type":"int"
                                 },
                                 {
                                    "name":"measuredEntityDn",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measuredEntitySoftwareVersion",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measuredEntityUserName",
                                    "type":"string"
                                 },
                                 {
                                    "name":"measInfoList",
                                    "type":{
                                       "type":"array",
                                       "items":{
                                          "type":"record",
                                          "name":"MeasInfoList",
                                          "fields":[
                                             {
                                                "name":"measInfoId",
                                                "type":{
                                                   "type":"record",
                                                   "name":"MeasInfoId",
                                                   "fields":[
                                                      {
                                                         "name":"sMeasInfoId",
                                                         "type":"string"
                                                      }
                                                   ]
                                                }
                                             },
                                             {
                                                "name":"measTypes",
                                                "type":{
                                                   "type":"record",
                                                   "name":"MeasTypes",
                                                   "fields":[
                                                      {
                                                         "name":"sMeasTypesList",
                                                         "type":{
                                                            "type":"array",
                                                            "items":"string"
                                                         }
                                                      }
                                                   ]
                                                }
                                             },
                                             {
                                                "name":"measValuesList",
                                                "type":{
                                                   "type":"array",
                                                   "items":{
                                                      "type":"record",
                                                      "name":"MeasValuesList",
                                                      "fields":[
                                                         {
                                                            "name":"measObjInstId",
                                                            "type":"string"
                                                         },
                                                         {
                                                            "name":"suspectFlag",
                                                            "type":"string"
                                                         },                                                         
                                                         {
                                                            "name":"measResults",
                                                            "type":{
                                                               "type":"array",
                                                               "items":{
                                                                  "type":"record",
                                                                  "name":"MeasResult",
                                                                  "fields":[
                                                                     {
                                                                        "name":"p",
                                                                        "type":"int"
                                                                     },
                                                                     {
                                                                        "name":"sValue",
                                                                        "type":"string"
                                                                     }
                                                                  ]
                                                               }
                                                            }
                                                         }
                                                      ]
                                                   }
                                                }
                                             }
                                          ]
                                       }
                                    }
                                 }
                              ]
                           }
                        }
                     ]
                  }
               }
            ]
         }
      }
   ]
}


Links

Apache Avro - a data serialization system

Apicurio Registry

AVRO SCHEMA TOOLS

  • No labels