From 9ffab3a16039140039120f404e1b2e02948d2cbb Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Wed, 29 May 2024 12:56:08 +0300 Subject: [PATCH 01/16] add ability to switch off/on creation of parquet dwh --- cloudbuild.yaml | 38 ++++++++---- docker/config/application.yaml | 4 +- docker/config_fhir_sink/application.yaml | 59 +++++++++++++++++++ docker/config_fhir_sink/flink-conf.yaml | 31 ++++++++++ .../hapi-postgres-config_local.json | 9 +++ e2e-tests/controller-spark/Dockerfile | 3 +- .../controller_spark_sql_validation.sh | 54 ++++++++++++----- .../fhir/analytics/ConvertResourceFn.java | 2 +- .../fhir/analytics/FetchSearchPageFn.java | 6 +- .../google/fhir/analytics/FhirEtlOptions.java | 6 ++ pipelines/controller/config/application.yaml | 3 + .../google/fhir/analytics/DataProperties.java | 9 +++ 12 files changed, 194 insertions(+), 30 deletions(-) create mode 100644 docker/config_fhir_sink/application.yaml create mode 100644 docker/config_fhir_sink/flink-conf.yaml create mode 100644 docker/config_fhir_sink/hapi-postgres-config_local.json diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 67c2e5079..5448cc896 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -118,15 +118,6 @@ steps: args: [ '-U', 'admin', '-d', 'postgres', '-h', 'hapi-fhir-db', '-p', '5432', '-c', 'CREATE DATABASE views;'] -# Resetting FHIR sink server -- name: 'docker/compose' - id: 'Turn down FHIR Sink Server' - args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v'] - -- name: 'docker/compose' - id: 'Launch HAPI FHIR Sink Server' - args: [ '-f', './docker/sink-compose.yml', 'up','--force-recreate', '-d' ] - - name: 'docker/compose' id: 'Bring up controller and Spark containers' env: @@ -148,9 +139,36 @@ steps: # - -c # - docker logs pipeline-controller +- name: 'docker/compose' + id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync' + args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] + +# Resetting Sink FHIR server +- name: 'docker/compose' + id: 'Turn down HAPI Sink FHIR Server for FHIR server to FHIR server sync' + args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v'] + +- name: 'docker/compose' + id: 'Launch Sink FHIR Server for FHIR server to FHIR server sync' + args: ['-f', './docker/sink-compose.yml', 'up', '--force-recreate', '-d'] + +# Spinning up only the pipeline controller for FHIR server to FHIR server sync +- name: 'docker/compose' + id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync' + env: + - PIPELINE_CONFIG=/workspace/docker/config_fhir_sink + - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh + args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', + '--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ] + +- name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' + id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync' + env: + - DWH_TYPE="FHIR" + - name: 'docker/compose' id: 'Bring down controller and Spark containers' - args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ] + args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] - name: 'docker/compose' id: 'Turn down HAPI Source and Sink Servers' diff --git a/docker/config/application.yaml b/docker/config/application.yaml index 83295dc7c..ee261e582 100644 --- a/docker/config/application.yaml +++ b/docker/config/application.yaml @@ -25,6 +25,8 @@ fhirdata: # fhirServerUrl: "http://hapi-server:8080/fhir" dbConfig: "config/hapi-postgres-config_local.json" dwhRootPrefix: "/dwh/controller_DWH" + #Whether to create a Parquet DWH or not + createParquetDwh: true incrementalSchedule: "0 0 * * * *" purgeSchedule: "0 30 * * * *" numOfDwhSnapshotsToRetain: 2 @@ -43,7 +45,7 @@ fhirdata: rowGroupSizeForParquetFiles: 33554432 # 32mb viewDefinitionsDir: "config/views" sinkDbConfigPath: "config/hapi-postgres-config_local_views.json" - sinkFhirServerUrl: "http://sink-server:8080/fhir" + #sinkFhirServerUrl: "http://sink-server:8080/fhir" #sinkUserName: "hapi" #sinkPassword: "hapi123" recursiveDepth: 1 diff --git a/docker/config_fhir_sink/application.yaml b/docker/config_fhir_sink/application.yaml new file mode 100644 index 000000000..942a6c325 --- /dev/null +++ b/docker/config_fhir_sink/application.yaml @@ -0,0 +1,59 @@ +# +# Copyright 2020-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# See `pipelines/controller/config/application.yaml` for full documentation +# of these options. +# This config is meant to be used by `compose-controller-spark-sql.yaml`. +fhirdata: + # 172.17.0.1 is an example docker network interface ip address; + # `hapi-server` is another docker example where a container with that name is + # running on the same docker network. + # fhirServerUrl: "http://172.17.0.1:8091/fhir" + # fhirServerUrl: "http://hapi-server:8080/fhir" + dbConfig: "config/hapi-postgres-config_local.json" + dwhRootPrefix: "/dwh/controller_DWH" + #Whether to create a Parquet DWH or not + createParquetDwh: false + incrementalSchedule: "0 0 * * * *" + purgeSchedule: "0 30 * * * *" + numOfDwhSnapshotsToRetain: 2 + # There is no Questionnaire in our test FHIR server, but it is added to + # prevent regression of https://github.com/google/fhir-data-pipes/issues/785. + # TODO: add resource table creation to e2e tests. + resourceList: "Patient,Encounter,Observation,Questionnaire,Condition,Practitioner,Location,Organization,DiagnosticReport,Immunization,MedicationRequest,PractitionerRole,Procedure" + numThreads: 1 + autoGenerateFlinkConfiguration: true + createHiveResourceTables: false + #thriftserverHiveConfig: "config/thriftserver-hive-config_local.json" + #hiveResourceViewsDir: "config/views" + # structureDefinitionsPath: "config/profile-definitions" + structureDefinitionsPath: "classpath:/r4-us-core-definitions" + fhirVersion: "R4" + rowGroupSizeForParquetFiles: 33554432 # 32mb + #viewDefinitionsDir: "config/views" + #sinkDbConfigPath: "config/hapi-postgres-config_local_views.json" + sinkFhirServerUrl: "http://sink-server:8080/fhir" + #sinkUserName: "hapi" + #sinkPassword: "hapi123" + recursiveDepth: 1 + +# Enable spring boot actuator end points, use "*" to expose all endpoints, or a comma-separated +# list to expose selected ones +management: + endpoints: + web: + exposure: + include: health,info,metrics,prometheus,pipeline-metrics diff --git a/docker/config_fhir_sink/flink-conf.yaml b/docker/config_fhir_sink/flink-conf.yaml new file mode 100644 index 000000000..109de5192 --- /dev/null +++ b/docker/config_fhir_sink/flink-conf.yaml @@ -0,0 +1,31 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# To use this config, FLINK_CONF_DIR env. var should be set to the parent dir. + +# This is needed to prevent an "Insufficient number of network buffers" +# exceptions when running the merger on large input with many workers. +taskmanager.memory.network.max: 256mb + +# This is needed to be able to process large resources, otherwise in JDBC +# mode we may get the following exception: +# "The record exceeds the maximum size of a sort buffer ..." +taskmanager.memory.managed.size: 256mb + +# This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately +# this is overwritten in `local` mode: https://stackoverflow.com/a/74416240 +execution.attached: false + +# This is required to track the pipeline metrics when FlinkRunner is used. +execution.job-listeners: com.google.fhir.analytics.metrics.FlinkJobListener diff --git a/docker/config_fhir_sink/hapi-postgres-config_local.json b/docker/config_fhir_sink/hapi-postgres-config_local.json new file mode 100644 index 000000000..743efc6bb --- /dev/null +++ b/docker/config_fhir_sink/hapi-postgres-config_local.json @@ -0,0 +1,9 @@ +{ + "jdbcDriverClass": "org.postgresql.Driver", + "databaseService" : "postgresql", + "databaseHostName" : "hapi-fhir-db", + "databasePort" : "5432", + "databaseUser" : "admin", + "databasePassword" : "admin", + "databaseName" : "hapi" +} diff --git a/e2e-tests/controller-spark/Dockerfile b/e2e-tests/controller-spark/Dockerfile index 2d55aa88f..aada6bd39 100644 --- a/e2e-tests/controller-spark/Dockerfile +++ b/e2e-tests/controller-spark/Dockerfile @@ -23,5 +23,6 @@ COPY parquet-tools-1.11.1.jar parquet-tools-1.11.1.jar ENV PARQUET_SUBDIR="dwh" ENV DOCKER_NETWORK="--use_docker_network" ENV HOME_DIR="/workspace/e2e-tests/controller-spark" +ENV DWH_TYPE="PARQUET" -ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK} +ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK} ${DWH_TYPE} diff --git a/e2e-tests/controller-spark/controller_spark_sql_validation.sh b/e2e-tests/controller-spark/controller_spark_sql_validation.sh index 4ed3f86dd..c351d773f 100755 --- a/e2e-tests/controller-spark/controller_spark_sql_validation.sh +++ b/e2e-tests/controller-spark/controller_spark_sql_validation.sh @@ -65,7 +65,13 @@ function validate_args() { # anything that needs printing ################################################# function print_message() { - local print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:" + local print_prefix="" + if [[ "${DWH_TYPE}" == "PARQUET" ]] + then + print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:" + else + print_prefix="E2E TEST FOR CONTROLLER FHIR SERVER TO FHIR SERVER SYNC:" + fi echo "${print_prefix} $*" } @@ -88,6 +94,7 @@ function print_message() { function setup() { HOME_PATH=$1 PARQUET_SUBDIR=$2 + DWH_TYPE=$4 SOURCE_FHIR_SERVER_URL='http://localhost:8091' SINK_FHIR_SERVER_URL='http://localhost:8098' PIPELINE_CONTROLLER_URL='http://localhost:8090' @@ -187,7 +194,7 @@ function run_pipeline() { ####################################################################### function check_parquet() { local isIncremental=$1 - local runtime="15 minute" + local runtime="5 minute" local end_time=$(date -ud "$runtime" +%s) local output="${HOME_PATH}/${PARQUET_SUBDIR}" local timeout=true @@ -224,7 +231,7 @@ function check_parquet() { timeout=false break else - sleep 20 + sleep 10 fi fi done @@ -412,8 +419,14 @@ setup "$@" fhir_source_query sleep 50 run_pipeline "FULL" -check_parquet false -test_fhir_sink "FULL" +if [[ "${DWH_TYPE}" == "PARQUET" ]] +then + check_parquet false +else + # Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server + sleep 900 + test_fhir_sink "FULL" +fi clear @@ -425,16 +438,27 @@ update_resource sleep 60 # Incremental run. run_pipeline "INCREMENTAL" -check_parquet true -fhir_source_query -test_fhir_sink "INCREMENTAL" - -validate_resource_tables -validate_resource_tables_data -validate_updated_resource +if [[ "${DWH_TYPE}" == "PARQUET" ]] +then + check_parquet true +else + fhir_source_query + # Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server + sleep 300 + test_fhir_sink "INCREMENTAL" +fi + +if [[ "${DWH_TYPE}" == "PARQUET" ]] +then + validate_resource_tables + validate_resource_tables_data + validate_updated_resource + + # View recreation run + # TODO add validation for the views as well + run_pipeline "VIEWS" + +fi -# View recreation run -# TODO add validation for the views as well -run_pipeline "VIEWS" print_message "END!!" \ No newline at end of file diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java index 1a29ef425..5820a50b3 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java @@ -142,7 +142,7 @@ public void writeResource(HapiRowDescriptor element) numFetchedResourcesMap.get(resourceType).inc(1); - if (!parquetFile.isEmpty()) { + if (parquetUtil != null) { startTime = System.currentTimeMillis(); parquetUtil.write(resource); totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java index 5ba19f11f..5019052f0 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java @@ -21,7 +21,6 @@ import ca.uhn.fhir.parser.IParser; import com.cerner.bunsen.exception.ProfileException; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig; import com.google.fhir.analytics.model.DatabaseConfiguration; import com.google.fhir.analytics.view.ViewApplicationException; @@ -88,6 +87,8 @@ abstract class FetchSearchPageFn extends DoFn> { protected final String parquetFile; + protected final Boolean createParquetDwh; + private final int secondsToFlush; private final int rowGroupSize; @@ -130,6 +131,7 @@ abstract class FetchSearchPageFn extends DoFn> { this.oAuthClientSecret = options.getFhirServerOAuthClientSecret(); this.stageIdentifier = stageIdentifier; this.parquetFile = options.getOutputParquetPath(); + this.createParquetDwh = options.isCreateParquetDwh(); this.secondsToFlush = options.getSecondsToFlushParquetFiles(); this.rowGroupSize = options.getRowGroupSizeForParquetFiles(); this.viewDefinitionsDir = options.getViewDefinitionsDir(); @@ -200,7 +202,7 @@ public void setup() throws SQLException, ProfileException { oAuthClientSecret, fhirContext); fhirSearchUtil = new FhirSearchUtil(fetchUtil); - if (!Strings.isNullOrEmpty(parquetFile)) { + if (createParquetDwh) { parquetUtil = new ParquetUtil( fhirContext.getVersion().getVersion(), diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java index c28e05902..8ff02bc10 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java @@ -260,4 +260,10 @@ public interface FhirEtlOptions extends BasePipelineOptions { String getSourceNDJsonFilePattern(); void setSourceNDJsonFilePattern(String value); + + @Description("Flag to switch off/on creation of a parquet DWH") + @Default.Boolean(true) + Boolean isCreateParquetDwh(); + + void setCreateParquetDwh(Boolean value); } diff --git a/pipelines/controller/config/application.yaml b/pipelines/controller/config/application.yaml index 81c63eae9..15129f56a 100644 --- a/pipelines/controller/config/application.yaml +++ b/pipelines/controller/config/application.yaml @@ -72,6 +72,9 @@ fhirdata: # that directory too, such that files created by the pipelines are readable by # the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`. dwhRootPrefix: "dwh/controller_DEV_DWH" + #Whether to create a Parquet DWH or not.In case of syncying between a FHIR server to FHIR server , + # generation of parquet DWH could be switched off/on + createParquetDwh: true # The schedule for automatic incremental pipeline runs. # Uses the Spring CronExpression format, i.e., diff --git a/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java b/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java index ca68f910f..58c20ddee 100644 --- a/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java +++ b/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java @@ -118,6 +118,8 @@ public class DataProperties { private int recursiveDepth; + private boolean createParquetDwh; + @PostConstruct void validateProperties() { CronExpression.parse(incrementalSchedule); @@ -127,6 +129,9 @@ void validateProperties() { "At least one of fhirServerUrl or dbConfig should be set!"); Preconditions.checkState(fhirVersion != null, "FhirVersion cannot be empty"); + Preconditions.checkArgument( + !Strings.isNullOrEmpty(dwhRootPrefix), "dwhRootPrefix is required!"); + if (!Strings.isNullOrEmpty(dbConfig)) { if (!Strings.isNullOrEmpty(fhirServerUrl)) { logger.warn("Both fhirServerUrl and dbConfig are set; ignoring fhirServerUrl!"); @@ -138,6 +143,7 @@ void validateProperties() { logger.info("Using FHIR-search mode since dbConfig is not set."); } Preconditions.checkState(!createHiveResourceTables || !thriftserverHiveConfig.isEmpty()); + Preconditions.checkState(!createHiveResourceTables || createParquetDwh); } private PipelineConfig.PipelineConfigBuilder addFlinkOptions(FhirEtlOptions options) { @@ -213,6 +219,8 @@ PipelineConfig createBatchOptions() { Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_"); options.setOutputParquetPath(dwhRootPrefix + TIMESTAMP_PREFIX + timestampSuffix); + options.setCreateParquetDwh(createParquetDwh); + PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options); // Get hold of thrift server parquet directory from dwhRootPrefix config. @@ -230,6 +238,7 @@ List getConfigParams() { return List.of( new ConfigFields("fhirdata.fhirServerUrl", fhirServerUrl, "", ""), new ConfigFields("fhirdata.dwhRootPrefix", dwhRootPrefix, "", ""), + new ConfigFields("fhirdata.createParquetDwh", String.valueOf(createParquetDwh), "", ""), new ConfigFields("fhirdata.incrementalSchedule", incrementalSchedule, "", ""), new ConfigFields("fhirdata.purgeSchedule", purgeSchedule, "", ""), new ConfigFields( From f37bbebd989226d3519c525a9d48b5c744c2f9dc Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Fri, 31 May 2024 07:03:27 +0300 Subject: [PATCH 02/16] run e2e tests for parquet and fhir sink independently --- cloudbuild.yaml | 1 - docker/compose-controller-spark-sql-single.yaml | 2 -- 2 files changed, 3 deletions(-) diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 692370ae4..5448cc896 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -123,7 +123,6 @@ steps: env: - PIPELINE_CONFIG=/workspace/docker/config - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh - - FHIRDATA_SINKFHIRSERVERURL=http://sink-server:8080/fhir args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', '--force-recreate', '-d' ] diff --git a/docker/compose-controller-spark-sql-single.yaml b/docker/compose-controller-spark-sql-single.yaml index 503468e7e..83896375e 100644 --- a/docker/compose-controller-spark-sql-single.yaml +++ b/docker/compose-controller-spark-sql-single.yaml @@ -62,8 +62,6 @@ services: - ${DWH_ROOT}:/dwh environment: - JAVA_OPTS=$JAVA_OPTS - # This is to turn this on in e2e but leave it off in the default config. - - FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL ports: - '8090:8080' networks: From effc19dab92e059cd30007d8f99f4dd789ce3adf Mon Sep 17 00:00:00 2001 From: Mutesasira Moses Date: Thu, 2 Jan 2025 19:14:34 +0300 Subject: [PATCH 03/16] Update pipelines/controller/config/application.yaml Co-authored-by: Bashir Sadjad --- pipelines/controller/config/application.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/controller/config/application.yaml b/pipelines/controller/config/application.yaml index b78b00034..ab8140489 100644 --- a/pipelines/controller/config/application.yaml +++ b/pipelines/controller/config/application.yaml @@ -73,7 +73,7 @@ fhirdata: # that directory too, such that files created by the pipelines are readable by # the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`. dwhRootPrefix: "dwh/controller_DEV_DWH" - #Whether to create a Parquet DWH or not.In case of syncying between a FHIR server to FHIR server , + # Whether to create a Parquet DWH or not. In case of syncing from a FHIR server to another, if Parquet files are not needed, their generation can be switched off by this flag. # generation of parquet DWH could be switched off/on createParquetDwh: true From 162c89badd879b28371130abf31d3cbef1ac3fc6 Mon Sep 17 00:00:00 2001 From: Mutesasira Moses Date: Thu, 2 Jan 2025 19:14:46 +0300 Subject: [PATCH 04/16] Update pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java Co-authored-by: Bashir Sadjad --- .../src/main/java/com/google/fhir/analytics/FhirEtlOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java index 8ff02bc10..9ab30b210 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java @@ -261,7 +261,7 @@ public interface FhirEtlOptions extends BasePipelineOptions { void setSourceNDJsonFilePattern(String value); - @Description("Flag to switch off/on creation of a parquet DWH") + @Description("Flag to switch off/on creation of parquet files; can be turned off when syncing from a FHIR server to another.") @Default.Boolean(true) Boolean isCreateParquetDwh(); From 345d3217593659ca3c646ebd748b2b68b00ea54d Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Sun, 5 Jan 2025 11:37:57 +0300 Subject: [PATCH 05/16] Update PR and adress review comments --- cloudbuild.yaml | 47 +++++----- .../compose-controller-spark-sql-single.yaml | 6 ++ docker/config/application.yaml | 3 +- docker/config_fhir_sink/application.yaml | 59 ------------ docker/config_fhir_sink/flink-conf.yaml | 31 ------- .../hapi-postgres-config_local.json | 9 -- .../controller_spark_sql_validation.sh | 91 ++++++------------- .../fhir/analytics/ConvertResourceFn.java | 2 +- .../fhir/analytics/FetchSearchPageFn.java | 9 +- .../google/fhir/analytics/FhirEtlOptions.java | 10 +- pipelines/controller/config/application.yaml | 6 +- .../google/fhir/analytics/DataProperties.java | 23 ++--- 12 files changed, 80 insertions(+), 216 deletions(-) delete mode 100644 docker/config_fhir_sink/application.yaml delete mode 100644 docker/config_fhir_sink/flink-conf.yaml delete mode 100644 docker/config_fhir_sink/hapi-postgres-config_local.json diff --git a/cloudbuild.yaml b/cloudbuild.yaml index b89899fcb..4b75baa0f 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -189,23 +189,19 @@ steps: '-c', 'CREATE DATABASE views;'] waitFor: ['Turn down FHIR Sink Server Search'] -- name: 'docker/compose' - id: 'Launch HAPI FHIR Sink Server Controller' - args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'up','--force-recreate', '-d' ] - env: - - SINK_SERVER_NAME=sink-server-controller - - SINK_SERVER_PORT=9001 - waitFor: ['Create views database'] - - name: 'docker/compose' id: 'Bring up controller and Spark containers' env: - PIPELINE_CONFIG=/workspace/docker/config - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh - - FHIRDATA_SINKFHIRSERVERURL=http://sink-server-controller:8080/fhir + - FHIRDATA_SINKFHIRSERVERURL= + - FHIRDATA_GENERATEPARQUETFILES=true + - FHIRDATA_CREATEHIVERESOURCETABLES=true + - FHIRDATA_CREATEPARQUETVIEWS=true + - FHIRDATA_SINKDBCONFIGPATH=config/hapi-postgres-config_local_views.json args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', '--force-recreate', '-d' ] - waitFor: ['Launch HAPI FHIR Sink Server Controller'] + waitFor: ['Create views database'] - name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' id: 'Run E2E Test for Dockerized Controller and Spark Thriftserver' @@ -224,39 +220,46 @@ steps: - name: 'docker/compose' id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync' args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] - -# Resetting Sink FHIR server -- name: 'docker/compose' - id: 'Turn down HAPI Sink FHIR Server for FHIR server to FHIR server sync' - args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v'] + waitFor: ['Run E2E Test for Dockerized Controller and Spark Thriftserver'] - name: 'docker/compose' - id: 'Launch Sink FHIR Server for FHIR server to FHIR server sync' - args: ['-f', './docker/sink-compose.yml', 'up', '--force-recreate', '-d'] + id: 'Launch HAPI FHIR Sink Server Controller' + args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'up','--force-recreate', '-d' ] + env: + - SINK_SERVER_NAME=sink-server-controller + - SINK_SERVER_PORT=9001 + waitFor: ['Bring down controller and Spark containers for FHIR server to FHIR server sync'] # Spinning up only the pipeline controller for FHIR server to FHIR server sync - name: 'docker/compose' - id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync' + id: 'Bring up the pipeline controller' env: - - PIPELINE_CONFIG=/workspace/docker/config_fhir_sink + - PIPELINE_CONFIG=/workspace/docker/config - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh + - FHIRDATA_SINKFHIRSERVERURL=http://sink-server-controller:8080/fhir + - FHIRDATA_GENERATEPARQUETFILES=false + - FHIRDATA_CREATEHIVERESOURCETABLES=false + - FHIRDATA_CREATEPARQUETVIEWS=false + - FHIRDATA_SINKDBCONFIGPATH= args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', '--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ] + waitFor: ['Launch HAPI FHIR Sink Server Controller'] - name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync' + waitFor: ['Bring up the pipeline controller'] env: - DWH_TYPE="FHIR" - name: 'docker/compose' - id: 'Bring down controller and Spark containers' + id: 'Bring down controller' args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] - waitFor: ['Run E2E Test for Dockerized Controller and Spark Thriftserver'] + waitFor: ['Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync'] - name: 'docker/compose' id: 'Turn down HAPI Source Server' args: [ '-f', './docker/hapi-compose.yml', 'down' ] - waitFor: ['Bring down controller and Spark containers'] + waitFor: ['Bring down controller'] - name: 'docker/compose' id: 'Turn down FHIR Sink Server Controller for e2e tests' diff --git a/docker/compose-controller-spark-sql-single.yaml b/docker/compose-controller-spark-sql-single.yaml index 83896375e..c761eae01 100644 --- a/docker/compose-controller-spark-sql-single.yaml +++ b/docker/compose-controller-spark-sql-single.yaml @@ -62,6 +62,12 @@ services: - ${DWH_ROOT}:/dwh environment: - JAVA_OPTS=$JAVA_OPTS + # This is to overide the values in the default config. + - FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL + - FHIRDATA_GENERATEPARQUETFILES=$FHIRDATA_GENERATEPARQUETFILES + - FHIRDATA_CREATEHIVERESOURCETABLES=$FHIRDATA_CREATEHIVERESOURCETABLES + - FHIRDATA_CREATEPARQUETVIEWS=$FHIRDATA_CREATEPARQUETVIEWS + - FHIRDATA_SINKDBCONFIGPATH=$FHIRDATA_SINKDBCONFIGPATH ports: - '8090:8080' networks: diff --git a/docker/config/application.yaml b/docker/config/application.yaml index 77eab9270..23e5e0f53 100644 --- a/docker/config/application.yaml +++ b/docker/config/application.yaml @@ -26,8 +26,7 @@ fhirdata: # fhirServerUrl: "http://hapi-server:8080/fhir" dbConfig: "config/hapi-postgres-config_local.json" dwhRootPrefix: "/dwh/controller_DWH" - #Whether to create a Parquet DWH or not - createParquetDwh: true + generateParquetFiles: true incrementalSchedule: "0 0 * * * *" purgeSchedule: "0 30 * * * *" numOfDwhSnapshotsToRetain: 2 diff --git a/docker/config_fhir_sink/application.yaml b/docker/config_fhir_sink/application.yaml deleted file mode 100644 index 942a6c325..000000000 --- a/docker/config_fhir_sink/application.yaml +++ /dev/null @@ -1,59 +0,0 @@ -# -# Copyright 2020-2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# See `pipelines/controller/config/application.yaml` for full documentation -# of these options. -# This config is meant to be used by `compose-controller-spark-sql.yaml`. -fhirdata: - # 172.17.0.1 is an example docker network interface ip address; - # `hapi-server` is another docker example where a container with that name is - # running on the same docker network. - # fhirServerUrl: "http://172.17.0.1:8091/fhir" - # fhirServerUrl: "http://hapi-server:8080/fhir" - dbConfig: "config/hapi-postgres-config_local.json" - dwhRootPrefix: "/dwh/controller_DWH" - #Whether to create a Parquet DWH or not - createParquetDwh: false - incrementalSchedule: "0 0 * * * *" - purgeSchedule: "0 30 * * * *" - numOfDwhSnapshotsToRetain: 2 - # There is no Questionnaire in our test FHIR server, but it is added to - # prevent regression of https://github.com/google/fhir-data-pipes/issues/785. - # TODO: add resource table creation to e2e tests. - resourceList: "Patient,Encounter,Observation,Questionnaire,Condition,Practitioner,Location,Organization,DiagnosticReport,Immunization,MedicationRequest,PractitionerRole,Procedure" - numThreads: 1 - autoGenerateFlinkConfiguration: true - createHiveResourceTables: false - #thriftserverHiveConfig: "config/thriftserver-hive-config_local.json" - #hiveResourceViewsDir: "config/views" - # structureDefinitionsPath: "config/profile-definitions" - structureDefinitionsPath: "classpath:/r4-us-core-definitions" - fhirVersion: "R4" - rowGroupSizeForParquetFiles: 33554432 # 32mb - #viewDefinitionsDir: "config/views" - #sinkDbConfigPath: "config/hapi-postgres-config_local_views.json" - sinkFhirServerUrl: "http://sink-server:8080/fhir" - #sinkUserName: "hapi" - #sinkPassword: "hapi123" - recursiveDepth: 1 - -# Enable spring boot actuator end points, use "*" to expose all endpoints, or a comma-separated -# list to expose selected ones -management: - endpoints: - web: - exposure: - include: health,info,metrics,prometheus,pipeline-metrics diff --git a/docker/config_fhir_sink/flink-conf.yaml b/docker/config_fhir_sink/flink-conf.yaml deleted file mode 100644 index 109de5192..000000000 --- a/docker/config_fhir_sink/flink-conf.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2023 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# To use this config, FLINK_CONF_DIR env. var should be set to the parent dir. - -# This is needed to prevent an "Insufficient number of network buffers" -# exceptions when running the merger on large input with many workers. -taskmanager.memory.network.max: 256mb - -# This is needed to be able to process large resources, otherwise in JDBC -# mode we may get the following exception: -# "The record exceeds the maximum size of a sort buffer ..." -taskmanager.memory.managed.size: 256mb - -# This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately -# this is overwritten in `local` mode: https://stackoverflow.com/a/74416240 -execution.attached: false - -# This is required to track the pipeline metrics when FlinkRunner is used. -execution.job-listeners: com.google.fhir.analytics.metrics.FlinkJobListener diff --git a/docker/config_fhir_sink/hapi-postgres-config_local.json b/docker/config_fhir_sink/hapi-postgres-config_local.json deleted file mode 100644 index 743efc6bb..000000000 --- a/docker/config_fhir_sink/hapi-postgres-config_local.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "jdbcDriverClass": "org.postgresql.Driver", - "databaseService" : "postgresql", - "databaseHostName" : "hapi-fhir-db", - "databasePort" : "5432", - "databaseUser" : "admin", - "databasePassword" : "admin", - "databaseName" : "hapi" -} diff --git a/e2e-tests/controller-spark/controller_spark_sql_validation.sh b/e2e-tests/controller-spark/controller_spark_sql_validation.sh index 52780c782..f773f6785 100755 --- a/e2e-tests/controller-spark/controller_spark_sql_validation.sh +++ b/e2e-tests/controller-spark/controller_spark_sql_validation.sh @@ -222,8 +222,6 @@ function wait_for_completion() { ####################################################################### function check_parquet() { local isIncremental=$1 - local runtime="5 minute" - local end_time=$(date -ud "$runtime" +%s) local output="${HOME_PATH}/${PARQUET_SUBDIR}" TOTAL_VIEW_PATIENTS=106 @@ -237,40 +235,8 @@ function check_parquet() { TOTAL_TEST_OBS=$((2*TOTAL_TEST_OBS)) fi - - while [[ $(date -u +%s) -le $end_time ]] - do - # check whether output directory has started receiving parquet files. - if [[ "$(ls -A $output)" ]] - then - local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \ - "${output}/*/Patient/" | awk '{print $3}') - local total_encounters=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \ - "${output}/*/Encounter/" | awk '{print $3}') - local total_observations=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \ - "${output}/*/Observation/" | awk '{print $3}') - - print_message "Total patients: $total_patients" - print_message "Total encounters: $total_encounters" - print_message "Total observations: $total_observations" - - if [[ "${total_patients}" == "${TOTAL_TEST_PATIENTS}" && "${total_encounters}" \ - == "${TOTAL_TEST_ENCOUNTERS}" && "${total_observations}" == "${TOTAL_TEST_OBS}" ]] \ - ; then - print_message "Pipeline transformation successfully completed." - timeout=false - break - else - sleep 10 - fi - fi - done - - if [[ "${timeout}" == "true" ]] - # check whether output directory has received parquet files. if [[ "$(ls -A $output)" ]] - then local total_patients=$(java -Xms16g -Xmx16g -jar ./parquet-tools-1.11.1.jar rowcount \ "${output}/*/Patient/" | awk '{print $3}') @@ -451,6 +417,26 @@ function validate_updated_resource() { } +function validate_updated_resource_in_fhir_sink() { + local fhir_username="hapi" + local fhir_password="hapi" + local fhir_url_extension="/fhir" + + # Fetch the patient resource using the Patient ID + local updated_family_name=$(curl -X GET -H "Content-Type: application/json; charset=utf-8" -u $fhir_username:$fhir_password \ + --connect-timeout 5 --max-time 20 "${SINK_FHIR_SERVER_URL}${fhir_url_extension}/Patient/${PATIENT_ID}" \ + | jq -r '.name[0].family') + + if [[ "${updated_family_name}" == "Anderson" ]] + then + print_message "Updated Patient data for ${PATIENT_ID} in FHIR sink verified successfully." + else + print_message "Updated Patient data verification for ${PATIENT_ID} in FHIR sink failed." + exit 6 + fi +} + + ################################################# # Function that counts resources in FHIR server and compares output to what is # in the source FHIR server @@ -493,59 +479,34 @@ setup "$@" fhir_source_query sleep 30 run_pipeline "FULL" - +wait_for_completion if [[ "${DWH_TYPE}" == "PARQUET" ]] then check_parquet false else - # Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server - sleep 900 test_fhir_sink "FULL" fi -wait_for_completion -check_parquet false -test_fhir_sink "FULL" - - clear add_resource update_resource # Incremental run. run_pipeline "INCREMENTAL" - +wait_for_completion if [[ "${DWH_TYPE}" == "PARQUET" ]] then check_parquet true -else - fhir_source_query - # Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server - sleep 300 - test_fhir_sink "INCREMENTAL" -fi - -if [[ "${DWH_TYPE}" == "PARQUET" ]] -then validate_resource_tables validate_resource_tables_data validate_updated_resource - # View recreation run # TODO add validation for the views as well run_pipeline "VIEWS" - +else + fhir_source_query + test_fhir_sink "INCREMENTAL" + validate_updated_resource_in_fhir_sink fi -wait_for_completion -check_parquet true -fhir_source_query -test_fhir_sink "INCREMENTAL" - -validate_resource_tables -validate_resource_tables_data -validate_updated_resource - - - print_message "END!!" \ No newline at end of file diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java index a416c6ecc..553a8b363 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/ConvertResourceFn.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java index 2dde5f08e..1423194cb 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FetchSearchPageFn.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import ca.uhn.fhir.parser.IParser; import com.cerner.bunsen.exception.ProfileException; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig; import com.google.fhir.analytics.model.DatabaseConfiguration; import com.google.fhir.analytics.view.ViewApplicationException; @@ -87,7 +88,7 @@ abstract class FetchSearchPageFn extends DoFn> { protected final String parquetFile; - protected final Boolean createParquetDwh; + protected final Boolean generateParquetFiles; private final int secondsToFlush; @@ -136,7 +137,7 @@ abstract class FetchSearchPageFn extends DoFn> { this.oAuthClientSecret = options.getFhirServerOAuthClientSecret(); this.stageIdentifier = stageIdentifier; this.parquetFile = options.getOutputParquetPath(); - this.createParquetDwh = options.isCreateParquetDwh(); + this.generateParquetFiles = options.isGenerateParquetFiles(); this.secondsToFlush = options.getSecondsToFlushParquetFiles(); this.rowGroupSize = options.getRowGroupSizeForParquetFiles(); if (DATAFLOW_RUNNER.equals(options.getRunner().getSimpleName())) { @@ -212,7 +213,7 @@ public void setup() throws SQLException, ProfileException { oAuthClientSecret, fhirContext); fhirSearchUtil = new FhirSearchUtil(fetchUtil); - if (createParquetDwh) { + if (generateParquetFiles && !Strings.isNullOrEmpty(parquetFile)) { parquetUtil = new ParquetUtil( fhirContext.getVersion().getVersion(), diff --git a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java index 8b1a386cc..b1f6a306f 100644 --- a/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java +++ b/pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEtlOptions.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -264,11 +264,13 @@ public interface FhirEtlOptions extends BasePipelineOptions { @Default.String("") String getSourceNdjsonFilePatternList(); - @Description("Flag to switch off/on creation of parquet files; can be turned off when syncing from a FHIR server to another.") + @Description( + "Flag to switch off/on generation of parquet files; can be turned off when syncing from a" + + " FHIR server to another.") @Default.Boolean(true) - Boolean isCreateParquetDwh(); + Boolean isGenerateParquetFiles(); - void setCreateParquetDwh(Boolean value); + void setGenerateParquetFiles(Boolean value); void setSourceNdjsonFilePatternList(String value); } diff --git a/pipelines/controller/config/application.yaml b/pipelines/controller/config/application.yaml index 52cb5bb53..edfd55471 100644 --- a/pipelines/controller/config/application.yaml +++ b/pipelines/controller/config/application.yaml @@ -77,9 +77,9 @@ fhirdata: # that directory too, such that files created by the pipelines are readable by # the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`. dwhRootPrefix: "dwh/controller_DEV_DWH" - # Whether to create a Parquet DWH or not. In case of syncing from a FHIR server to another, if Parquet files are not needed, their generation can be switched off by this flag. - # generation of parquet DWH could be switched off/on - createParquetDwh: true + # Whether to generate Parquet Files or not. In case of syncing from a FHIR server to another, + # if Parquet files are not needed, their generation can be switched off by this flag. + generateParquetFiles: true # The schedule for automatic incremental pipeline runs. # Uses the Spring CronExpression format, i.e., diff --git a/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java b/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java index 5102cdcb1..1a24f33b0 100644 --- a/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java +++ b/pipelines/controller/src/main/java/com/google/fhir/analytics/DataProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2020-2024 Google LLC + * Copyright 2020-2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -119,7 +119,7 @@ public class DataProperties { private int recursiveDepth; - private boolean createParquetDwh; + private boolean generateParquetFiles; @PostConstruct void validateProperties() { @@ -137,20 +137,10 @@ void validateProperties() { Preconditions.checkArgument( !Strings.isNullOrEmpty(dwhRootPrefix), "dwhRootPrefix is required!"); - - if (!Strings.isNullOrEmpty(dbConfig)) { - if (!Strings.isNullOrEmpty(fhirServerUrl)) { - logger.warn("Both fhirServerUrl and dbConfig are set; ignoring fhirServerUrl!"); - } - logger.info("Using JDBC mode since dbConfig is set."); - } else { - // This should always be true because of the first Precondition. - Preconditions.checkArgument(!Strings.isNullOrEmpty(fhirServerUrl)); - logger.info("Using FHIR-search mode since dbConfig is not set."); - } Preconditions.checkState(fhirVersion != null, "FhirVersion cannot be empty"); Preconditions.checkState(!createHiveResourceTables || !thriftserverHiveConfig.isEmpty()); - Preconditions.checkState(!createHiveResourceTables || createParquetDwh); + Preconditions.checkState(!createHiveResourceTables || generateParquetFiles); + Preconditions.checkState(!createParquetViews || generateParquetFiles); } private PipelineConfig.PipelineConfigBuilder addFlinkOptions(FhirEtlOptions options) { @@ -228,7 +218,7 @@ PipelineConfig createBatchOptions() { String timestampSuffix = DwhFiles.safeTimestampSuffix(); options.setOutputParquetPath(dwhRootPrefix + DwhFiles.TIMESTAMP_PREFIX + timestampSuffix); - options.setCreateParquetDwh(createParquetDwh); + options.setGenerateParquetFiles(generateParquetFiles); PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options); @@ -249,7 +239,8 @@ List getConfigParams() { "fhirdata.fhirFetchMode", fhirFetchMode != null ? fhirFetchMode.name() : "", "", ""), new ConfigFields("fhirdata.fhirServerUrl", fhirServerUrl, "", ""), new ConfigFields("fhirdata.dwhRootPrefix", dwhRootPrefix, "", ""), - new ConfigFields("fhirdata.createParquetDwh", String.valueOf(createParquetDwh), "", ""), + new ConfigFields( + "fhirdata.generateParquetFiles", String.valueOf(generateParquetFiles), "", ""), new ConfigFields("fhirdata.incrementalSchedule", incrementalSchedule, "", ""), new ConfigFields("fhirdata.purgeSchedule", purgeSchedule, "", ""), new ConfigFields( From a3068a121a0e7b0ec14a6276240f11c115069bfc Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Mon, 6 Jan 2025 13:03:02 +0300 Subject: [PATCH 06/16] re-triger --- e2e-tests/controller-spark/controller_spark_sql_validation.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/e2e-tests/controller-spark/controller_spark_sql_validation.sh b/e2e-tests/controller-spark/controller_spark_sql_validation.sh index f773f6785..e4b003ce5 100755 --- a/e2e-tests/controller-spark/controller_spark_sql_validation.sh +++ b/e2e-tests/controller-spark/controller_spark_sql_validation.sh @@ -478,6 +478,7 @@ validate_args "$@" setup "$@" fhir_source_query sleep 30 +# Full run. run_pipeline "FULL" wait_for_completion if [[ "${DWH_TYPE}" == "PARQUET" ]] From 154ec7968610dc85a546b6d88140247204889b90 Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Mon, 13 Jan 2025 21:52:14 +0300 Subject: [PATCH 07/16] fix typo --- docker/compose-controller-spark-sql-single.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/compose-controller-spark-sql-single.yaml b/docker/compose-controller-spark-sql-single.yaml index c761eae01..19bbdd1c2 100644 --- a/docker/compose-controller-spark-sql-single.yaml +++ b/docker/compose-controller-spark-sql-single.yaml @@ -62,7 +62,7 @@ services: - ${DWH_ROOT}:/dwh environment: - JAVA_OPTS=$JAVA_OPTS - # This is to overide the values in the default config. + # This is to override the values in the default config. - FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL - FHIRDATA_GENERATEPARQUETFILES=$FHIRDATA_GENERATEPARQUETFILES - FHIRDATA_CREATEHIVERESOURCETABLES=$FHIRDATA_CREATEHIVERESOURCETABLES From e93f8e56ae10d1ba2bea9029ef9889679d7dd7ec Mon Sep 17 00:00:00 2001 From: Mutesasira Moses Date: Mon, 13 Jan 2025 22:19:46 +0300 Subject: [PATCH 08/16] Update e2e-tests/controller-spark/controller_spark_sql_validation.sh Co-authored-by: Bashir Sadjad --- e2e-tests/controller-spark/controller_spark_sql_validation.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e-tests/controller-spark/controller_spark_sql_validation.sh b/e2e-tests/controller-spark/controller_spark_sql_validation.sh index e4b003ce5..34cd15af1 100755 --- a/e2e-tests/controller-spark/controller_spark_sql_validation.sh +++ b/e2e-tests/controller-spark/controller_spark_sql_validation.sh @@ -68,7 +68,7 @@ function print_message() { local print_prefix="" if [[ "${DWH_TYPE}" == "PARQUET" ]] then - print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:" + print_prefix="E2E TEST FOR CONTROLLER PARQUET BASED DEPLOYMENT:" else print_prefix="E2E TEST FOR CONTROLLER FHIR SERVER TO FHIR SERVER SYNC:" fi From e73241205c311b4fb1724b87237c2e7d1178c9e5 Mon Sep 17 00:00:00 2001 From: Mutesasira Moses Date: Mon, 13 Jan 2025 22:20:35 +0300 Subject: [PATCH 09/16] Update docker/compose-controller-spark-sql-single.yaml Co-authored-by: Bashir Sadjad --- docker/compose-controller-spark-sql-single.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/compose-controller-spark-sql-single.yaml b/docker/compose-controller-spark-sql-single.yaml index 19bbdd1c2..1bc36f402 100644 --- a/docker/compose-controller-spark-sql-single.yaml +++ b/docker/compose-controller-spark-sql-single.yaml @@ -64,7 +64,7 @@ services: - JAVA_OPTS=$JAVA_OPTS # This is to override the values in the default config. - FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL - - FHIRDATA_GENERATEPARQUETFILES=$FHIRDATA_GENERATEPARQUETFILES + - FHIRDATA_GENERATEPARQUETFILES=${FHIRDATA_GENERATEPARQUETFILES:-true} - FHIRDATA_CREATEHIVERESOURCETABLES=$FHIRDATA_CREATEHIVERESOURCETABLES - FHIRDATA_CREATEPARQUETVIEWS=$FHIRDATA_CREATEPARQUETVIEWS - FHIRDATA_SINKDBCONFIGPATH=$FHIRDATA_SINKDBCONFIGPATH From b7fdb9a8ef665e3f27eeafc471479d3eb3ed8382 Mon Sep 17 00:00:00 2001 From: Mutesasira Moses Date: Mon, 13 Jan 2025 22:21:10 +0300 Subject: [PATCH 10/16] Update cloudbuild.yaml Co-authored-by: Bashir Sadjad --- cloudbuild.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 4b75baa0f..162a04048 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -223,7 +223,7 @@ steps: waitFor: ['Run E2E Test for Dockerized Controller and Spark Thriftserver'] - name: 'docker/compose' - id: 'Launch HAPI FHIR Sink Server Controller' + id: 'Launch HAPI FHIR Sink Server' args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'up','--force-recreate', '-d' ] env: - SINK_SERVER_NAME=sink-server-controller From 11ae8d17b3364b20e3f7b9ac3957f95f2ac8b063 Mon Sep 17 00:00:00 2001 From: Mutesasira Moses Date: Mon, 13 Jan 2025 22:21:39 +0300 Subject: [PATCH 11/16] Update cloudbuild.yaml Co-authored-by: Bashir Sadjad --- cloudbuild.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 162a04048..396ae725a 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -224,7 +224,7 @@ steps: - name: 'docker/compose' id: 'Launch HAPI FHIR Sink Server' - args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'up','--force-recreate', '-d' ] + args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server', 'up','--force-recreate', '-d' ] env: - SINK_SERVER_NAME=sink-server-controller - SINK_SERVER_PORT=9001 From a9511179a91debca63a8e82effd91e7380aad47b Mon Sep 17 00:00:00 2001 From: Mutesasira Moses Date: Mon, 13 Jan 2025 22:22:07 +0300 Subject: [PATCH 12/16] Update cloudbuild.yaml Co-authored-by: Bashir Sadjad --- cloudbuild.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 396ae725a..7eb43f1d2 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -218,7 +218,7 @@ steps: # - docker logs pipeline-controller - name: 'docker/compose' - id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync' + id: 'Bring down controller and Spark containers' args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] waitFor: ['Run E2E Test for Dockerized Controller and Spark Thriftserver'] From 84893785a2e31aeffde023c9ec133e39a206d134 Mon Sep 17 00:00:00 2001 From: Mutesasira Moses Date: Mon, 13 Jan 2025 22:22:43 +0300 Subject: [PATCH 13/16] Update cloudbuild.yaml Co-authored-by: Bashir Sadjad --- cloudbuild.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 7eb43f1d2..d37498148 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -246,7 +246,7 @@ steps: waitFor: ['Launch HAPI FHIR Sink Server Controller'] - name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' - id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync' + id: 'Run E2E Test for Dockerized Controller in FHIR server to FHIR server sync mode' waitFor: ['Bring up the pipeline controller'] env: - DWH_TYPE="FHIR" From 7cea4aea9697264b2fdea2ab98287a8f23b1bc64 Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Mon, 13 Jan 2025 22:51:30 +0300 Subject: [PATCH 14/16] addres comments --- cloudbuild.yaml | 21 +++++++------------ .../compose-controller-spark-sql-single.yaml | 8 +++---- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/cloudbuild.yaml b/cloudbuild.yaml index d37498148..c897e5a76 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -194,11 +194,6 @@ steps: env: - PIPELINE_CONFIG=/workspace/docker/config - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh - - FHIRDATA_SINKFHIRSERVERURL= - - FHIRDATA_GENERATEPARQUETFILES=true - - FHIRDATA_CREATEHIVERESOURCETABLES=true - - FHIRDATA_CREATEPARQUETVIEWS=true - - FHIRDATA_SINKDBCONFIGPATH=config/hapi-postgres-config_local_views.json args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', '--force-recreate', '-d' ] waitFor: ['Create views database'] @@ -228,11 +223,11 @@ steps: env: - SINK_SERVER_NAME=sink-server-controller - SINK_SERVER_PORT=9001 - waitFor: ['Bring down controller and Spark containers for FHIR server to FHIR server sync'] + waitFor: ['Bring down controller and Spark containers'] # Spinning up only the pipeline controller for FHIR server to FHIR server sync - name: 'docker/compose' - id: 'Bring up the pipeline controller' + id: 'Bring up the pipeline controller for for FHIR server to FHIR server sync' env: - PIPELINE_CONFIG=/workspace/docker/config - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh @@ -243,27 +238,27 @@ steps: - FHIRDATA_SINKDBCONFIGPATH= args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up', '--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ] - waitFor: ['Launch HAPI FHIR Sink Server Controller'] + waitFor: ['Launch HAPI FHIR Sink Server'] - name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' id: 'Run E2E Test for Dockerized Controller in FHIR server to FHIR server sync mode' - waitFor: ['Bring up the pipeline controller'] + waitFor: ['Bring up the pipeline controller for for FHIR server to FHIR server sync'] env: - DWH_TYPE="FHIR" - name: 'docker/compose' - id: 'Bring down controller' + id: 'Bring down the pipeline controller' args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v'] - waitFor: ['Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync'] + waitFor: ['Run E2E Test for Dockerized Controller in FHIR server to FHIR server sync mode'] - name: 'docker/compose' id: 'Turn down HAPI Source Server' args: [ '-f', './docker/hapi-compose.yml', 'down' ] - waitFor: ['Bring down controller'] + waitFor: ['Bring down the pipeline controller'] - name: 'docker/compose' id: 'Turn down FHIR Sink Server Controller for e2e tests' - args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server-controller', 'down' ,'-v'] + args: [ '-f', './docker/sink-compose.yml', '-p', 'sink-server', 'down' ,'-v'] env: - SINK_SERVER_NAME=sink-server-controller - SINK_SERVER_PORT=9001 diff --git a/docker/compose-controller-spark-sql-single.yaml b/docker/compose-controller-spark-sql-single.yaml index 1bc36f402..df0d43ffa 100644 --- a/docker/compose-controller-spark-sql-single.yaml +++ b/docker/compose-controller-spark-sql-single.yaml @@ -63,11 +63,11 @@ services: environment: - JAVA_OPTS=$JAVA_OPTS # This is to override the values in the default config. - - FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL + - FHIRDATA_SINKFHIRSERVERURL=$(FHIRDATA_SINKFHIRSERVERURL:-} - FHIRDATA_GENERATEPARQUETFILES=${FHIRDATA_GENERATEPARQUETFILES:-true} - - FHIRDATA_CREATEHIVERESOURCETABLES=$FHIRDATA_CREATEHIVERESOURCETABLES - - FHIRDATA_CREATEPARQUETVIEWS=$FHIRDATA_CREATEPARQUETVIEWS - - FHIRDATA_SINKDBCONFIGPATH=$FHIRDATA_SINKDBCONFIGPATH + - FHIRDATA_CREATEHIVERESOURCETABLES=${FHIRDATA_CREATEHIVERESOURCETABLES:-true} + - FHIRDATA_CREATEPARQUETVIEWS=${FHIRDATA_CREATEPARQUETVIEWS:-true} + - FHIRDATA_SINKDBCONFIGPATH=${FHIRDATA_SINKDBCONFIGPATH:-config/hapi-postgres-config_local_views.json} ports: - '8090:8080' networks: From 47c81d81fea817c8500a75a12e8cf926c368abce Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Mon, 13 Jan 2025 22:53:51 +0300 Subject: [PATCH 15/16] fix typo --- cloudbuild.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloudbuild.yaml b/cloudbuild.yaml index c897e5a76..9e0578e8b 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -227,7 +227,7 @@ steps: # Spinning up only the pipeline controller for FHIR server to FHIR server sync - name: 'docker/compose' - id: 'Bring up the pipeline controller for for FHIR server to FHIR server sync' + id: 'Bring up the pipeline controller for FHIR server to FHIR server sync' env: - PIPELINE_CONFIG=/workspace/docker/config - DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh @@ -242,7 +242,7 @@ steps: - name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}' id: 'Run E2E Test for Dockerized Controller in FHIR server to FHIR server sync mode' - waitFor: ['Bring up the pipeline controller for for FHIR server to FHIR server sync'] + waitFor: ['Bring up the pipeline controller for FHIR server to FHIR server sync'] env: - DWH_TYPE="FHIR" From e9933e4600b87d9913cd9780fd160b1e04c63fc1 Mon Sep 17 00:00:00 2001 From: mozzy11 Date: Mon, 13 Jan 2025 23:06:39 +0300 Subject: [PATCH 16/16] minor update --- docker/compose-controller-spark-sql-single.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/compose-controller-spark-sql-single.yaml b/docker/compose-controller-spark-sql-single.yaml index df0d43ffa..ac0d543dd 100644 --- a/docker/compose-controller-spark-sql-single.yaml +++ b/docker/compose-controller-spark-sql-single.yaml @@ -63,7 +63,7 @@ services: environment: - JAVA_OPTS=$JAVA_OPTS # This is to override the values in the default config. - - FHIRDATA_SINKFHIRSERVERURL=$(FHIRDATA_SINKFHIRSERVERURL:-} + - FHIRDATA_SINKFHIRSERVERURL=${FHIRDATA_SINKFHIRSERVERURL:-} - FHIRDATA_GENERATEPARQUETFILES=${FHIRDATA_GENERATEPARQUETFILES:-true} - FHIRDATA_CREATEHIVERESOURCETABLES=${FHIRDATA_CREATEHIVERESOURCETABLES:-true} - FHIRDATA_CREATEPARQUETVIEWS=${FHIRDATA_CREATEPARQUETVIEWS:-true}