Skip to content

Commit

Permalink
feat: add integrationdb ksql scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
pgallik committed Nov 30, 2023
1 parent 1236a06 commit 7bfb48b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
28 changes: 28 additions & 0 deletions .ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_OSLO_CONNECTOR.ksql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
CREATE SINK CONNECTOR `PostalIntegrationDbConnector` with (
"topics"= 'postal.snapshot.oslo.flatten.integrationdb',
"input.data.format"= 'JSON_SR',
"input.key.format"= 'JSON_SR',
"delete.enabled"= false,
"connector.class"= 'PostgresSink',
"name"= 'PostalIntegrationDbConnector',
"kafka.auth.mode"= 'KAFKA_API_KEY',
"kafka.api.key"= '***',
"kafka.api.secret"= '***',
"connection.host"= '***',
"connection.port"= '5432',
"connection.user"= 'basisregisters',
"connection.password"= '***',
"db.name"= 'postgres',
"ssl.mode"= 'require',
"insert.mode"= 'UPSERT',
"table.name.format"= 'Integration.PostInfo',
"table.types"= 'TABLE',
"db.timezone"= 'UTC',
"pk.mode"= 'record_key',
"pk.fields"= 'PostalCode',
"auto.create"= false,
"auto.evolve"= false,
"quote.sql.identifiers"= 'ALWAYS',
"batch.sizes"= 3000,
"tasks.max"= 1
);
20 changes: 20 additions & 0 deletions .ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_SNAPSHOT_OSLO_STREAM.ksql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE OR REPLACE STREAM IF NOT EXISTS postal_snapshot_oslo_stream_flatten_integrationdb
WITH (KAFKA_TOPIC='postal.snapshot.oslo.flatten.integrationdb', PARTITIONS=1, VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR')
AS SELECT
REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (S, X) => X) PostalCode,

CAST(gemeente->objectId AS INT) AS "NisCode",
postInfoStatus AS "Status",

FILTER(POSTNAMEN, (X) => (X->GEOGRAFISCHENAAM->TAAL = 'nl'))[1]->GEOGRAFISCHENAAM->SPELLING as "PostalNameDutch",
FILTER(POSTNAMEN, (X) => (X->GEOGRAFISCHENAAM->TAAL = 'fr'))[1]->GEOGRAFISCHENAAM->SPELLING as "PostalNameFrench",
FILTER(POSTNAMEN, (X) => (X->GEOGRAFISCHENAAM->TAAL = 'de'))[1]->GEOGRAFISCHENAAM->SPELLING as "PostalNameGerman",
FILTER(POSTNAMEN, (X) => (X->GEOGRAFISCHENAAM->TAAL = 'en'))[1]->GEOGRAFISCHENAAM->SPELLING as "PostalNameEnglish",

IDENTIFICATOR->ID as "PuriId",
IDENTIFICATOR->NAAMRUIMTE as "Namespace",
IDENTIFICATOR->VERSIEID as "VersionString",
PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp"

FROM POSTAL_SNAPSHOT_OSLO_STREAM
PARTITION BY REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (S, X) => X);

0 comments on commit 7bfb48b

Please sign in to comment.