diff --git a/.ksql/ALL_01_POSTAL_SNAPSHOT_OSLO_STREAM_V2.ksql b/.ksql/ALL_01_POSTAL_SNAPSHOT_OSLO_STREAM_V2.ksql new file mode 100644 index 00000000..dbd80ff2 --- /dev/null +++ b/.ksql/ALL_01_POSTAL_SNAPSHOT_OSLO_STREAM_V2.ksql @@ -0,0 +1,10 @@ +CREATE OR REPLACE STREAM IF NOT EXISTS postal_snapshot_oslo_stream_v2 ( + messagekey varchar KEY, + headers ARRAY> HEADERS, + `@context` varchar, + `@type` varchar, + identificator STRUCT, + gemeente STRUCT>>, + postnamen Array>>, + postInfoStatus varchar) +WITH (KAFKA_TOPIC='postal.snapshot.oslo', VALUE_FORMAT='JSON'); \ No newline at end of file diff --git a/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_SNAPSHOT_OSLO_STREAM.ksql b/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_SNAPSHOT_OSLO_STREAM.ksql index 01a79868..b2783880 100644 --- a/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_SNAPSHOT_OSLO_STREAM.ksql +++ b/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_SNAPSHOT_OSLO_STREAM.ksql @@ -14,7 +14,8 @@ CREATE OR REPLACE STREAM IF NOT EXISTS postal_snapshot_oslo_stream_flatten_integ IDENTIFICATOR->ID as "PuriId", IDENTIFICATOR->NAAMRUIMTE as "Namespace", IDENTIFICATOR->VERSIEID as "VersionString", - PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp" + PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp", + CAST(FROM_BYTES(FILTER(headers, (x) => (x->key = 'IdempotenceKey'))[1]->VALUE, 'utf8') AS BIGINT) as "IdempotenceKey" -FROM POSTAL_SNAPSHOT_OSLO_STREAM +FROM POSTAL_SNAPSHOT_OSLO_STREAM_V2 PARTITION BY REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (S, X) => X); \ No newline at end of file