Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ability to switch off/on generation of Parquet Files #1074

Merged
merged 29 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9ffab3a
add ability to switch off/on creation of parquet dwh
mozzy11 May 29, 2024
ef22c53
Merge branch 'master' into isolate_fhir_sync
mozzy11 May 31, 2024
f37bbeb
run e2e tests for parquet and fhir sink independently
mozzy11 May 31, 2024
fbcf38c
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jun 3, 2024
564d68a
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jun 3, 2024
f2da0ea
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jun 6, 2024
66ad736
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jul 8, 2024
e296d6c
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jul 9, 2024
effc19d
Update pipelines/controller/config/application.yaml
mozzy11 Jan 2, 2025
162c89b
Update pipelines/batch/src/main/java/com/google/fhir/analytics/FhirEt…
mozzy11 Jan 2, 2025
104990a
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jan 2, 2025
345d321
Update PR and adress review comments
mozzy11 Jan 5, 2025
a3068a1
re-triger
mozzy11 Jan 6, 2025
a543f25
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jan 6, 2025
9cc8d65
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jan 7, 2025
0ee09d6
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jan 7, 2025
1f0acf3
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jan 8, 2025
d0a8d88
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jan 13, 2025
154ec79
fix typo
mozzy11 Jan 13, 2025
e93f8e5
Update e2e-tests/controller-spark/controller_spark_sql_validation.sh
mozzy11 Jan 13, 2025
e732412
Update docker/compose-controller-spark-sql-single.yaml
mozzy11 Jan 13, 2025
b7fdb9a
Update cloudbuild.yaml
mozzy11 Jan 13, 2025
11ae8d1
Update cloudbuild.yaml
mozzy11 Jan 13, 2025
a951117
Update cloudbuild.yaml
mozzy11 Jan 13, 2025
8489378
Update cloudbuild.yaml
mozzy11 Jan 13, 2025
8750385
Merge branch 'master' into isolate_fhir_sync
mozzy11 Jan 13, 2025
7cea4ae
addres comments
mozzy11 Jan 13, 2025
47c81d8
fix typo
mozzy11 Jan 13, 2025
e9933e4
minor update
mozzy11 Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,11 @@ steps:
args: [ '-U', 'admin', '-d', 'postgres', '-h', 'hapi-fhir-db', '-p', '5432',
'-c', 'CREATE DATABASE views;']

# Resetting FHIR sink server
- name: 'docker/compose'
id: 'Turn down FHIR Sink Server'
args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Launch HAPI FHIR Sink Server'
args: [ '-f', './docker/sink-compose.yml', 'up','--force-recreate', '-d' ]

- name: 'docker/compose'
id: 'Bring up controller and Spark containers'
env:
- PIPELINE_CONFIG=/workspace/docker/config
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
- FHIRDATA_SINKFHIRSERVERURL=http://sink-server:8080/fhir
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '-d' ]

Expand All @@ -149,9 +139,36 @@ steps:
# - -c
# - docker logs pipeline-controller

- name: 'docker/compose'
id: 'Bring down controller and Spark containers for FHIR server to FHIR server sync'
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v']

# Resetting Sink FHIR server
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
- name: 'docker/compose'
id: 'Turn down HAPI Sink FHIR Server for FHIR server to FHIR server sync'
args: [ '-f', './docker/sink-compose.yml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Launch Sink FHIR Server for FHIR server to FHIR server sync'
args: ['-f', './docker/sink-compose.yml', 'up', '--force-recreate', '-d']

# Spinning up only the pipeline controller for FHIR server to FHIR server sync
- name: 'docker/compose'
id: 'Bring up only the pipeline controller for FHIR server to FHIR server sync'
env:
- PIPELINE_CONFIG=/workspace/docker/config_fhir_sink
- DWH_ROOT=/workspace/e2e-tests/controller-spark/dwh
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'up',
'--force-recreate', '--no-deps' , '-d' ,'pipeline-controller' ]

- name: '${_REPOSITORY}/e2e-tests/controller-spark:${_TAG}'
id: 'Run E2E Test for Dockerized Controller for FHIR server to FHIR server sync'
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
env:
- DWH_TYPE="FHIR"

- name: 'docker/compose'
id: 'Bring down controller and Spark containers'
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ]
args: [ '-f', './docker/compose-controller-spark-sql-single.yaml', 'down' ,'-v']

- name: 'docker/compose'
id: 'Turn down HAPI Source and Sink Servers'
Expand Down
2 changes: 0 additions & 2 deletions docker/compose-controller-spark-sql-single.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ services:
- ${DWH_ROOT}:/dwh
environment:
- JAVA_OPTS=$JAVA_OPTS
# This is to turn this on in e2e but leave it off in the default config.
- FHIRDATA_SINKFHIRSERVERURL=$FHIRDATA_SINKFHIRSERVERURL
ports:
- '8090:8080'
networks:
Expand Down
2 changes: 2 additions & 0 deletions docker/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ fhirdata:
# fhirServerUrl: "http://hapi-server:8080/fhir"
dbConfig: "config/hapi-postgres-config_local.json"
dwhRootPrefix: "/dwh/controller_DWH"
#Whether to create a Parquet DWH or not
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
createParquetDwh: true
incrementalSchedule: "0 0 * * * *"
purgeSchedule: "0 30 * * * *"
numOfDwhSnapshotsToRetain: 2
Expand Down
59 changes: 59 additions & 0 deletions docker/config_fhir_sink/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
# Copyright 2020-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# See `pipelines/controller/config/application.yaml` for full documentation
# of these options.
# This config is meant to be used by `compose-controller-spark-sql.yaml`.
fhirdata:
# 172.17.0.1 is an example docker network interface ip address;
# `hapi-server` is another docker example where a container with that name is
# running on the same docker network.
# fhirServerUrl: "http://172.17.0.1:8091/fhir"
# fhirServerUrl: "http://hapi-server:8080/fhir"
dbConfig: "config/hapi-postgres-config_local.json"
dwhRootPrefix: "/dwh/controller_DWH"
#Whether to create a Parquet DWH or not
createParquetDwh: false
incrementalSchedule: "0 0 * * * *"
purgeSchedule: "0 30 * * * *"
numOfDwhSnapshotsToRetain: 2
# There is no Questionnaire in our test FHIR server, but it is added to
# prevent regression of https://github.com/google/fhir-data-pipes/issues/785.
# TODO: add resource table creation to e2e tests.
resourceList: "Patient,Encounter,Observation,Questionnaire,Condition,Practitioner,Location,Organization,DiagnosticReport,Immunization,MedicationRequest,PractitionerRole,Procedure"
numThreads: 1
autoGenerateFlinkConfiguration: true
createHiveResourceTables: false
#thriftserverHiveConfig: "config/thriftserver-hive-config_local.json"
#hiveResourceViewsDir: "config/views"
# structureDefinitionsPath: "config/profile-definitions"
structureDefinitionsPath: "classpath:/r4-us-core-definitions"
fhirVersion: "R4"
rowGroupSizeForParquetFiles: 33554432 # 32mb
#viewDefinitionsDir: "config/views"
#sinkDbConfigPath: "config/hapi-postgres-config_local_views.json"
sinkFhirServerUrl: "http://sink-server:8080/fhir"
#sinkUserName: "hapi"
#sinkPassword: "hapi123"
recursiveDepth: 1

# Enable spring boot actuator end points, use "*" to expose all endpoints, or a comma-separated
# list to expose selected ones
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus,pipeline-metrics
31 changes: 31 additions & 0 deletions docker/config_fhir_sink/flink-conf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# To use this config, FLINK_CONF_DIR env. var should be set to the parent dir.

# This is needed to prevent an "Insufficient number of network buffers"
# exceptions when running the merger on large input with many workers.
taskmanager.memory.network.max: 256mb

# This is needed to be able to process large resources, otherwise in JDBC
# mode we may get the following exception:
# "The record exceeds the maximum size of a sort buffer ..."
taskmanager.memory.managed.size: 256mb

# This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately
# this is overwritten in `local` mode: https://stackoverflow.com/a/74416240
execution.attached: false

# This is required to track the pipeline metrics when FlinkRunner is used.
execution.job-listeners: com.google.fhir.analytics.metrics.FlinkJobListener
9 changes: 9 additions & 0 deletions docker/config_fhir_sink/hapi-postgres-config_local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"jdbcDriverClass": "org.postgresql.Driver",
"databaseService" : "postgresql",
"databaseHostName" : "hapi-fhir-db",
"databasePort" : "5432",
"databaseUser" : "admin",
"databasePassword" : "admin",
"databaseName" : "hapi"
}
3 changes: 2 additions & 1 deletion e2e-tests/controller-spark/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ COPY parquet-tools-1.11.1.jar parquet-tools-1.11.1.jar
ENV PARQUET_SUBDIR="dwh"
ENV DOCKER_NETWORK="--use_docker_network"
ENV HOME_DIR="/workspace/e2e-tests/controller-spark"
ENV DWH_TYPE="PARQUET"

ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK}
ENTRYPOINT cd ${HOME_DIR}; ./controller_spark_sql_validation.sh ${HOME_DIR} ${PARQUET_SUBDIR} ${DOCKER_NETWORK} ${DWH_TYPE}
54 changes: 39 additions & 15 deletions e2e-tests/controller-spark/controller_spark_sql_validation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,13 @@ function validate_args() {
# anything that needs printing
#################################################
function print_message() {
local print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:"
local print_prefix=""
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
print_prefix="E2E TEST FOR CONTROLLER SPARK DEPLOYMENT:"
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
else
print_prefix="E2E TEST FOR CONTROLLER FHIR SERVER TO FHIR SERVER SYNC:"
fi
echo "${print_prefix} $*"
}

Expand All @@ -88,6 +94,7 @@ function print_message() {
function setup() {
HOME_PATH=$1
PARQUET_SUBDIR=$2
DWH_TYPE=$4
SOURCE_FHIR_SERVER_URL='http://localhost:8091'
SINK_FHIR_SERVER_URL='http://localhost:8098'
PIPELINE_CONTROLLER_URL='http://localhost:8090'
Expand Down Expand Up @@ -187,7 +194,7 @@ function run_pipeline() {
#######################################################################
function check_parquet() {
local isIncremental=$1
local runtime="15 minute"
local runtime="5 minute"
local end_time=$(date -ud "$runtime" +%s)
local output="${HOME_PATH}/${PARQUET_SUBDIR}"
local timeout=true
Expand Down Expand Up @@ -224,7 +231,7 @@ function check_parquet() {
timeout=false
break
else
sleep 20
sleep 10
fi
fi
done
Expand Down Expand Up @@ -412,8 +419,14 @@ setup "$@"
fhir_source_query
sleep 50
run_pipeline "FULL"
check_parquet false
test_fhir_sink "FULL"
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet false
else
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 900
test_fhir_sink "FULL"
fi

clear

Expand All @@ -425,16 +438,27 @@ update_resource
sleep 60
# Incremental run.
run_pipeline "INCREMENTAL"
check_parquet true
fhir_source_query
test_fhir_sink "INCREMENTAL"

validate_resource_tables
validate_resource_tables_data
validate_updated_resource
if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
check_parquet true
else
fhir_source_query
# Provide enough Buffer time for FULL pipeline to completely run before testing the sink FHIR server
sleep 300
test_fhir_sink "INCREMENTAL"
fi

if [[ "${DWH_TYPE}" == "PARQUET" ]]
then
validate_resource_tables
validate_resource_tables_data
validate_updated_resource

# View recreation run
# TODO add validation for the views as well
run_pipeline "VIEWS"

fi

# View recreation run
# TODO add validation for the views as well
run_pipeline "VIEWS"

print_message "END!!"
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void writeResource(HapiRowDescriptor element)

numFetchedResourcesMap.get(resourceType).inc(1);

if (!parquetFile.isEmpty()) {
if (parquetUtil != null) {
startTime = System.currentTimeMillis();
parquetUtil.write(resource);
totalGenerateTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import ca.uhn.fhir.parser.IParser;
import com.cerner.bunsen.exception.ProfileException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.fhir.analytics.JdbcConnectionPools.DataSourceConfig;
import com.google.fhir.analytics.model.DatabaseConfiguration;
import com.google.fhir.analytics.view.ViewApplicationException;
Expand Down Expand Up @@ -88,6 +87,8 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {

protected final String parquetFile;

protected final Boolean createParquetDwh;

private final int secondsToFlush;

private final int rowGroupSize;
Expand Down Expand Up @@ -130,6 +131,7 @@ abstract class FetchSearchPageFn<T> extends DoFn<T, KV<String, Integer>> {
this.oAuthClientSecret = options.getFhirServerOAuthClientSecret();
this.stageIdentifier = stageIdentifier;
this.parquetFile = options.getOutputParquetPath();
this.createParquetDwh = options.isCreateParquetDwh();
this.secondsToFlush = options.getSecondsToFlushParquetFiles();
this.rowGroupSize = options.getRowGroupSizeForParquetFiles();
this.viewDefinitionsDir = options.getViewDefinitionsDir();
Expand Down Expand Up @@ -200,7 +202,7 @@ public void setup() throws SQLException, ProfileException {
oAuthClientSecret,
fhirContext);
fhirSearchUtil = new FhirSearchUtil(fetchUtil);
if (!Strings.isNullOrEmpty(parquetFile)) {
if (createParquetDwh) {
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
parquetUtil =
new ParquetUtil(
fhirContext.getVersion().getVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,10 @@ public interface FhirEtlOptions extends BasePipelineOptions {
String getSourceNDJsonFilePattern();

void setSourceNDJsonFilePattern(String value);

@Description("Flag to switch off/on creation of a parquet DWH")
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
@Default.Boolean(true)
Boolean isCreateParquetDwh();

void setCreateParquetDwh(Boolean value);
}
3 changes: 3 additions & 0 deletions pipelines/controller/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ fhirdata:
# that directory too, such that files created by the pipelines are readable by
# the Thrift Server, e.g., `setfacl -d -m o::rx dwh/`.
dwhRootPrefix: "dwh/controller_DEV_DWH"
#Whether to create a Parquet DWH or not.In case of syncying between a FHIR server to FHIR server ,
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
# generation of parquet DWH could be switched off/on
createParquetDwh: true

# The schedule for automatic incremental pipeline runs.
# Uses the Spring CronExpression format, i.e.,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public class DataProperties {

private int recursiveDepth;

private boolean createParquetDwh;

@PostConstruct
void validateProperties() {
CronExpression.parse(incrementalSchedule);
Expand All @@ -127,6 +129,9 @@ void validateProperties() {
"At least one of fhirServerUrl or dbConfig should be set!");
Preconditions.checkState(fhirVersion != null, "FhirVersion cannot be empty");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(dwhRootPrefix), "dwhRootPrefix is required!");

if (!Strings.isNullOrEmpty(dbConfig)) {
if (!Strings.isNullOrEmpty(fhirServerUrl)) {
logger.warn("Both fhirServerUrl and dbConfig are set; ignoring fhirServerUrl!");
Expand All @@ -138,6 +143,7 @@ void validateProperties() {
logger.info("Using FHIR-search mode since dbConfig is not set.");
}
Preconditions.checkState(!createHiveResourceTables || !thriftserverHiveConfig.isEmpty());
Preconditions.checkState(!createHiveResourceTables || createParquetDwh);
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved
}

private PipelineConfig.PipelineConfigBuilder addFlinkOptions(FhirEtlOptions options) {
Expand Down Expand Up @@ -213,6 +219,8 @@ PipelineConfig createBatchOptions() {
Instant.now().toString().replace(":", "-").replace("-", "_").replace(".", "_");
options.setOutputParquetPath(dwhRootPrefix + TIMESTAMP_PREFIX + timestampSuffix);

options.setCreateParquetDwh(createParquetDwh);
mozzy11 marked this conversation as resolved.
Show resolved Hide resolved

PipelineConfig.PipelineConfigBuilder pipelineConfigBuilder = addFlinkOptions(options);

// Get hold of thrift server parquet directory from dwhRootPrefix config.
Expand All @@ -230,6 +238,7 @@ List<ConfigFields> getConfigParams() {
return List.of(
new ConfigFields("fhirdata.fhirServerUrl", fhirServerUrl, "", ""),
new ConfigFields("fhirdata.dwhRootPrefix", dwhRootPrefix, "", ""),
new ConfigFields("fhirdata.createParquetDwh", String.valueOf(createParquetDwh), "", ""),
new ConfigFields("fhirdata.incrementalSchedule", incrementalSchedule, "", ""),
new ConfigFields("fhirdata.purgeSchedule", purgeSchedule, "", ""),
new ConfigFields(
Expand Down
Loading