diff --git a/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_OSLO_CONNECTOR.ksql b/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_OSLO_CONNECTOR.ksql new file mode 100644 index 00000000..2b55865e --- /dev/null +++ b/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_OSLO_CONNECTOR.ksql @@ -0,0 +1,28 @@ +CREATE SINK CONNECTOR `PostalIntegrationDbConnector` with ( + "topics"= 'postal.snapshot.oslo.flatten.integrationdb', + "input.data.format"= 'JSON_SR', + "input.key.format"= 'JSON_SR', + "delete.enabled"= false, + "connector.class"= 'PostgresSink', + "name"= 'PostalIntegrationDbConnector', + "kafka.auth.mode"= 'KAFKA_API_KEY', + "kafka.api.key"= '***', + "kafka.api.secret"= '***', + "connection.host"= '***', + "connection.port"= '5432', + "connection.user"= 'basisregisters', + "connection.password"= '***', + "db.name"= 'postgres', + "ssl.mode"= 'require', + "insert.mode"= 'UPSERT', + "table.name.format"= 'Integration.PostInfo', + "table.types"= 'TABLE', + "db.timezone"= 'UTC', + "pk.mode"= 'record_key', + "pk.fields"= 'PostalCode', + "auto.create"= false, + "auto.evolve"= false, + "quote.sql.identifiers"= 'ALWAYS', + "batch.sizes"= 3000, + "tasks.max"= 1 + ); \ No newline at end of file diff --git a/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_SNAPSHOT_OSLO_STREAM.ksql b/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_SNAPSHOT_OSLO_STREAM.ksql new file mode 100644 index 00000000..01a79868 --- /dev/null +++ b/.ksql/INTEGRATIONDB/INTEGRATIONDB_POSTAL_SNAPSHOT_OSLO_STREAM.ksql @@ -0,0 +1,20 @@ +CREATE OR REPLACE STREAM IF NOT EXISTS postal_snapshot_oslo_stream_flatten_integrationdb + WITH (KAFKA_TOPIC='postal.snapshot.oslo.flatten.integrationdb', PARTITIONS=1, VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') + AS SELECT + REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (S, X) => X) PostalCode, + + CAST(gemeente->objectId AS INT) AS "NisCode", + postInfoStatus AS "Status", + + FILTER(POSTNAMEN, (X) => (X->GEOGRAFISCHENAAM->TAAL = 'nl'))[1]->GEOGRAFISCHENAAM->SPELLING as "PostalNameDutch", + FILTER(POSTNAMEN, (X) => (X->GEOGRAFISCHENAAM->TAAL = 'fr'))[1]->GEOGRAFISCHENAAM->SPELLING as "PostalNameFrench", + FILTER(POSTNAMEN, (X) => (X->GEOGRAFISCHENAAM->TAAL = 'de'))[1]->GEOGRAFISCHENAAM->SPELLING as "PostalNameGerman", + FILTER(POSTNAMEN, (X) => (X->GEOGRAFISCHENAAM->TAAL = 'en'))[1]->GEOGRAFISCHENAAM->SPELLING as "PostalNameEnglish", + + IDENTIFICATOR->ID as "PuriId", + IDENTIFICATOR->NAAMRUIMTE as "Namespace", + IDENTIFICATOR->VERSIEID as "VersionString", + PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp" + +FROM POSTAL_SNAPSHOT_OSLO_STREAM +PARTITION BY REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (S, X) => X); \ No newline at end of file