...
Code Block |
---|
language | text |
---|
title | pms transform1 |
---|
|
CREATE STREAM IF NOT EXISTS pms_stream_transform1 AS
select event->commonEventHeader->domain as domain,
event->commonEventHeader->eventName as eventName,
event->commonEventHeader->sourceName as sourceName,
event->commonEventHeader->startEpochMicrosec as startEpochMicrosec,
event->commonEventHeader->lastEpochMicrosec as event->commonEventHeader->lastEpochMicrosec as lastEpochMicrosec,
eventlastEpochMicrosec,
event->perf3gppFields->perf3gppFieldsVersion as perf3gppFieldsVersion,
event->perf3gppFields->measDataCollection->granularityPeriod as granularityPeriod,
event->perf3gppFields->measDataCollection->measuredEntityUserName as measuredEntityUserName,
event->perf3gppFields->measDataCollection->measuredEntityDn as measuredEntityDn,
EXPLODEEXPLODE(event->perf3gppFields->measDataCollection->measInfoList)->measTypes->sMeasTypesList as sMeasTypesList,
EXPLODE(event->perf3gppFields->measDataCollection->measInfoList)->measValuesList as measValuesList
from pms_stream EMIT CHANGES; |
...
Code Block |
---|
language | text |
---|
title | pms transform2 |
---|
|
CREATE STREAM IF NOT EXISTS pms_stream_transform2 AS
select measuredEntityDn, measuredEntityUserName, sMeasTypesList,
explode(measValuesList)->measObjInstId as measObjInstId,
explode(measValuesList)->measObjInstId >measResults as measObjInstId,measResults
from pms_stream_transform1 EMIT CHANGES; |
Code Block |
---|
language | text |
---|
title | pms transform3 |
---|
|
CREATE STREAM IF NOT EXISTS pms_stream_transform3 AS
select measuredEntityDn, measuredEntityUserName, explode(measValuesListsMeasTypesList)->measResults as measResultssMeasType,
measObjInstId, explode(measResults)->sValue as sValue from pms_stream_transform2; |
Code Block |
---|
language | text |
---|
title | pms transform4 |
---|
|
CREATE STREAM IF NOT EXISTS pms_stream_transform4 AS
select measuredEntityDn + measObjInstId + sMeasType AS MY_COMPOSITE_KEY,
measuredEntityDn, measuredEntityUserName, sMeasType, measObjInstId, sValue
from pms_stream_transform3; |
Code Block |
---|
language | text |
---|
title | pms transform5 |
---|
|
CREATE STREAM IF NOT EXISTS pms_stream_transform5 AS
select MY_COMPOSITE_KEY, measuredEntityDn, measuredEntityUserName, sMeasType, measObjInstId, sValue
from pms_stream_transform4 partition by MY_COMPOSITE_KEY; |
Tables
Code Block |
---|
language | text |
---|
title | pms table |
---|
|
CREATE TABLE IF NOT EXISTS PMS_TABLE (ROWKEY VARCHAR PRIMARY KEY,
measuredEntityDn VARCHAR,
measuredEntityUserName VARCHAR,
sMeasType VARCHAR,
measObjInstId VARCHAR,
sValue VARCHAR
) WITH (KAFKA_TOPIC='PMS_STREAM_TRANSFORM5', VALUE_FORMAT='JSON', PARTITIONS from pms_stream_transform1= 1); |
Code Block |
---|
language | text |
---|
title | pms view |
---|
|
CREATE OR REPLACE table pms_view as select * from PMS_TABLE EMIT CHANGES; |
Queries
Image Added
Image Added
Links
ksqldb
How to create a user-defined function
...