Skip to content

Commit

Permalink
feat: add idempotencekey ksql
Browse files Browse the repository at this point in the history
  • Loading branch information
emalfroy authored and ArneD committed Dec 8, 2023
1 parent 3dfd59b commit 927420d
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
10 changes: 10 additions & 0 deletions .ksql/ALL_01_POSTAL_SNAPSHOT_OSLO_STREAM_V2.ksql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE OR REPLACE STREAM IF NOT EXISTS postal_snapshot_oslo_stream_v2 (
messagekey varchar KEY,
headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS,
`@context` varchar,
`@type` varchar,
identificator STRUCT<id varchar, naamruimte varchar, objectId varchar, versieId varchar>,
gemeente STRUCT<objectId varchar, detail varchar, gemeentenaam STRUCT<geografischenaam STRUCT<spelling varchar, taal varchar>>>,
postnamen Array<STRUCT<geografischeNaam STRUCT<spelling varchar, taal varchar>>>,
postInfoStatus varchar)
WITH (KAFKA_TOPIC='postal.snapshot.oslo', VALUE_FORMAT='JSON');
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ CREATE OR REPLACE STREAM IF NOT EXISTS postal_snapshot_oslo_stream_flatten_integ
IDENTIFICATOR->ID as "PuriId",
IDENTIFICATOR->NAAMRUIMTE as "Namespace",
IDENTIFICATOR->VERSIEID as "VersionString",
PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp"
PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp",
CAST(FROM_BYTES(FILTER(headers, (x) => (x->key = 'IdempotenceKey'))[1]->VALUE, 'utf8') AS BIGINT) as "IdempotenceKey"

FROM POSTAL_SNAPSHOT_OSLO_STREAM
FROM POSTAL_SNAPSHOT_OSLO_STREAM_V2
PARTITION BY REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (S, X) => X);

0 comments on commit 927420d

Please sign in to comment.