The DMaaP Mediator Producer provides Information Coordinator jobs that pushes messages from DMaaP Message Router to data consumers.

The producer is configured with environment variables. The ones defining the host of the supervision and job info callbacks are required to set before starting the producer. All other variables have default values.

DMaaP Message Router (MR) forces the user to poll a specific topic. Examples of topics are faults and metrics. So the decision has been made for a mediator job to provide messages for a specific topic. The first job to be provided will push messages from the fault topic.

A discussion was held around wether a job should provide the possibility to filter the messages pushed or not. Though it would be a nice feature, it was decided that this should not be provided in the inital version. Later a number of different filtering solutions, more or less generic, can be considered and introduced if wanted.

The first job will have the following schema:

{​
"$schema": "https://json-schema.org/draft/2019-09/schema",​
"title": "STD_Fault_Messages",​
"description": "Schema for Job delivering fault messages from DMaaP Message Router",​
"type": "object",​
"properties":{}
}​

It was also decided that the implementation shall be done using the Go language and code should reside int the "nonrtric" repo.

The producer has a folder called "configs" that contains files whose file name is the job type name, and extension ".json", and content is the job definition schema. The producer takes all these files and register a type in the Information Coordinator for each of them. Should, for any reason, not all types be possible to register, the producer will exit and log the cause of the failure.

The producer builds up a slice of all supported types during the type registration. This info and the callback URLs provided as environment variables during startup are then used to register the producer in the Information Coordinator. Should, for any reason, the registration fail, the producer will exit and log the cause of the failure. It will also exit if the callback URLs are not provided in enverinment variables at startup.

Server handler for status supervision only replies to "GET" method at the "empty" path. All other calls will return "404 NotFound".

Final solution

The go version of the producer takes environment variables for the configuration of the application itself, see the README.md file in the nonrtric repo. It also reads a file for the configuration of job types. The file has the following format:

    {
       "types":
        [
          {
            "id": The ID of the job type, e.g. "STD_Fault_Messages",
            "dmaapTopicUrl": The topic URL to poll from DMaaP Message Router, e.g. "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
          }
      ]
    }

The configuration file is read once at startup, so to change the configuration of job types the producer must be restarted. When the types are read, the producer registers them and itself in the Information Coordinator Service (ICS). In case ICS is unavailable, the producer will wait indefinitely for it to become available.

When the registration is done the producer starts serving the producer callback API defined in ICS, and for each job type it will start polling the DMaaP Message Router (MR). If MR is unavailable, the producer will wait indefinitely until it is available again.

If no jobs for a type exist the polled messages will be discarded. When jobs are created it will distribute polled messages to the consumers owning the jobs. If a consumer is unavailable or has problems to reply in time, the messages for that consumer will be discarded and a new try will be made when new messages are polled. If a consumer is too slow to consume messages coming in, a buffer will be built up. If the buffer becomes full, it will be emptied and the messages in the buffer discarded. Then messages will be distributed as before again.

The producer also provides a REST API to set the log level dynamically at runtime.

The size of the docker image of the go version is about 27 MB, compared to the Java versions 298 MB. When running the go version it consumes about 25 MB and the Java version  about 400 MB. In heavy performance tests, the go version has been slightly more efficient than the Java version.

  • No labels