Message handling sequence

Players and roles

PLAYER

ROLE

Configuration

· Yaml configuration model

Consumer

  • Subscribe to topics

  • WaitForMessage (Polling on inTopic)

    • returns KafkaMessage

Processor

  • Read from inCh

  • Extract header

  • Unbase64

  • Unpack body (Packed -> PDU)

  • Encode PDU -> Xer

  • Xer -> JSON (using goxml2json)

  • Add header to JSON

  • Write JSON to outCh

Producer

  • Read from outCh

  • Publish to outTopic

Main

  • Initialization steps

    • Load and log configuration

    • Initialize inChannel and outChannel

    • Initialize Consumer, Processor(inch, outCh) and Producer(outCh)

    • Subscribe Consumer

    • Start Processor and Producer

    • Http server for health endpoint (go routine)

  • Loop on Consumer.WaitForMessage

    • Write message to inCh

Configurations

brokers

List of ip:port


Consumer-group

Name - same for all instances


In-topic

Name


Out-topic

Name


In-buffer-size

For inChannel

default value: 2 * #CPUs

Out-buffer-size

For outChannel

default value: 1 * #CPUs

Optional future configurations (for next versions, if more tuning needed)

Consumer-heartbeat-interval-ms

The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances

see https://docs.confluent.io/current/installation/configuration/consumer-configs.html

Consumer-session-timeout-ms

The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

see https://docs.confluent.io/current/installation/configuration/consumer-configs.html

Processor-pool-size

Default=#CPUs

We start with go-routine per message. We will use this if we’ll need to define routine-pool.

Producer-pool-size

Default=#CPUS*FACTOR

We start with go-routine per message. We will use this if we’ll need to define routine-pool.

Concurrency, Synchronicity & Scalability

  • Single Consumer (per partition. Multiple partitions will have multiple instances)

  • Multiple Processors (each message is processed by a go-routine)

  • Multiple Producers (each message is produced by a go-routine)

  • Asynchronic Producer – log errors

  • Multiple instances of app will address multiple partitions – this is managed exogenically by Kafka + K8S.

Dependencies

  • confluent-kafka-go

  • librdkafka

  • goxml2json

Error handling

Shutdown for these errors (otherwise, log and proceed):

  • Subscription error

  • Polling error (Partion error, AllBrokersAreDown)


Suggested JSON message format


{

"header": {<header data>},

"body": {<E2AP as json>}

}


Suggested name

E2JStreamer


  • No labels