Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagetext
titlepms transform1
            CREATE STREAM IF NOT EXISTS pms_stream_transform1 AS 
            select event->commonEventHeader->domain as domain, 
            event->commonEventHeader->eventName as eventName, 
            event->commonEventHeader->sourceName as sourceName, 
            event->commonEventHeader->startEpochMicrosec as startEpochMicrosec, 
event->commonEventHeader->lastEpochMicrosec as           event->commonEventHeader->lastEpochMicrosec as lastEpochMicrosec, 
            eventlastEpochMicrosec, 
event->perf3gppFields->perf3gppFieldsVersion as perf3gppFieldsVersion, 
            event->perf3gppFields->measDataCollection->granularityPeriod as granularityPeriod, 
            event->perf3gppFields->measDataCollection->measuredEntityUserName as measuredEntityUserName,
            event->perf3gppFields->measDataCollection->measuredEntityDn as measuredEntityDn,
            EXPLODEEXPLODE(event->perf3gppFields->measDataCollection->measInfoList)->measTypes->sMeasTypesList as sMeasTypesList,
            EXPLODE(event->perf3gppFields->measDataCollection->measInfoList)->measValuesList as measValuesList 
            from pms_stream EMIT CHANGES;

...

Code Block
languagetext
titlepms transform2
            CREATE STREAM IF NOT EXISTS pms_stream_transform2 AS 
            select measuredEntityDn, measuredEntityUserName, sMeasTypesList, 
explode(measValuesList)->measObjInstId  as          measObjInstId, 
explode(measValuesList)->measObjInstId >measResults as measObjInstId,measResults 
from pms_stream_transform1 EMIT CHANGES;


Code Block
languagetext
titlepms transform3
CREATE STREAM IF NOT EXISTS pms_stream_transform3 AS 
select measuredEntityDn, measuredEntityUserName, explode(measValuesListsMeasTypesList)->measResults as measResultssMeasType, 
measObjInstId, explode(measResults)->sValue as sValue from pms_stream_transform2;


Code Block
languagetext
titlepms transform4
CREATE STREAM IF NOT EXISTS pms_stream_transform4 AS 
select measuredEntityDn + measObjInstId + sMeasType AS MY_COMPOSITE_KEY, 
measuredEntityDn, measuredEntityUserName, sMeasType, measObjInstId, sValue 
from pms_stream_transform3;


Code Block
languagetext
titlepms transform5
CREATE STREAM IF NOT EXISTS pms_stream_transform5 AS 
select MY_COMPOSITE_KEY, measuredEntityDn, measuredEntityUserName, sMeasType, measObjInstId, sValue 
from pms_stream_transform4 partition by MY_COMPOSITE_KEY;


Tables

Code Block
languagetext
titlepms table
CREATE TABLE IF NOT EXISTS PMS_TABLE (ROWKEY VARCHAR PRIMARY KEY, 
                measuredEntityDn VARCHAR, 
                measuredEntityUserName VARCHAR, 
                sMeasType VARCHAR, 
                measObjInstId VARCHAR, 
                sValue VARCHAR 
                )             WITH (KAFKA_TOPIC='PMS_STREAM_TRANSFORM5', VALUE_FORMAT='JSON', PARTITIONS  from pms_stream_transform1= 1);


Code Block
languagetext
titlepms view
CREATE OR REPLACE table pms_view as select * from PMS_TABLE EMIT CHANGES;


Queries

Image Added


Image Added

Links

ksqldb

How to create a user-defined function

...