Skip to content

Commit

Permalink
Add new resource for creating Whistle Mapping, Reconciliation and Bac…
Browse files Browse the repository at this point in the history
…kfill Pipeline Jobs for Healthcare Data Engine (GoogleCloudPlatform#11812)

Co-authored-by: Cameron Thornton <[email protected]>
  • Loading branch information
2 people authored and karolgorc committed Oct 11, 2024
1 parent ad8e6cb commit ee0b5d1
Show file tree
Hide file tree
Showing 5 changed files with 476 additions and 0 deletions.
257 changes: 257 additions & 0 deletions mmv1/products/healthcare/PipelineJob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
# Copyright 2024 Google Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

---
name: 'PipelineJob'
kind: 'healthcare#pipelineJob'
description: |
PipelineJobs are Long Running Operations on Healthcare API to Map or Reconcile
incoming data into FHIR format
references:
guides:
'Creating a PipelineJob': 'https://cloud.google.com/healthcare-api/private/healthcare-data-engine/docs/reference/rest/v1/projects.locations.datasets.pipelineJobs#PipelineJob'
api: 'https://cloud.google.com/healthcare-api/healthcare-data-engine/docs/reference/rest/v1/projects.locations.datasets.pipelineJobs'
base_url: '{{dataset}}/pipelineJobs?pipelineJobId={{name}}'
self_link: '{{dataset}}/pipelineJobs/{{name}}'
delete_url: '{{dataset}}/pipelineJobs/{{name}}'
exclude_sweeper: true
update_verb: PATCH
update_mask: true
id_format: '{{dataset}}/pipelineJobs/{{name}}'
import_format: ['{{%dataset}}/pipelineJobs/{{name}}', '{{name}}', '{{dataset}}/pipelineJobs?pipelineJobId={{name}}']
examples:
- name: 'healthcare_pipeline_job_reconciliation'
primary_resource_id: 'example-pipeline'
vars:
pipeline_name: 'example_pipeline_job'
dataset_name: 'example_dataset'
fhir_store_name: 'fhir_store'
bucket_name: 'example_bucket_name'
- name: 'healthcare_pipeline_job_backfill'
primary_resource_id: 'example-pipeline'
vars:
backfill_pipeline_name: 'example_backfill_pipeline'
dataset_name: 'example_dataset'
mapping_pipeline_name: 'example_mapping_pipeline'
- name: 'healthcare_pipeline_job_whistle_mapping'
primary_resource_id: 'example-mapping-pipeline'
vars:
pipeline_name: 'example_mapping_pipeline_job'
dataset_name: 'example_dataset'
source_fhirstore_name: 'source_fhir_store'
dest_fhirstore_name: 'dest_fhir_store'
bucket_name: 'example_bucket_name'
- name: 'healthcare_pipeline_job_mapping_recon_dest'
primary_resource_id: 'example-mapping-pipeline'
vars:
pipeline_name: 'example_mapping_pipeline_job'
recon_pipeline_name: 'example_recon_pipeline_job'
dataset_name: 'example_dataset'
source_fhirstore_name: 'source_fhir_store'
dest_fhirstore_name: 'dest_fhir_store'
bucket_name: 'example_bucket_name'
custom_code:
decoder: templates/terraform/decoders/long_name_to_self_link.go.tmpl
parameters:
- name: 'location'
type: String
required: true
immutable: true
url_param_only: true
description: |
Location where the Pipeline Job is to run
- name: 'dataset'
type: String
required: true
immutable: true
url_param_only: true
description: |
Healthcare Dataset under which the Pipeline Job is to run
properties:
- name: 'name'
type: String
description: |
Specifies the name of the pipeline job. This field is user-assigned.
required: true
- name: 'disableLineage'
type: Boolean
description: |
If true, disables writing lineage for the pipeline.
required: false
default_value: false
- name: 'labels'
required: false
type: KeyValueLabels
description: |
User-supplied key-value pairs used to organize Pipeline Jobs.
Label keys must be between 1 and 63 characters long, have a UTF-8 encoding of
maximum 128 bytes, and must conform to the following PCRE regular expression:
[\p{Ll}\p{Lo}][\p{Ll}\p{Lo}\p{N}_-]{0,62}
Label values are optional, must be between 1 and 63 characters long, have a
UTF-8 encoding of maximum 128 bytes, and must conform to the following PCRE
regular expression: [\p{Ll}\p{Lo}\p{N}_-]{0,63}
No more than 64 labels can be associated with a given pipeline.
An object containing a list of "key": value pairs.
Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.
- name: 'selfLink'
type: String
description: |
The fully qualified name of this dataset
output: true
ignore_read: true
- name: mappingPipelineJob
type: NestedObject
conflicts:
- reconciliationPipelineJob
- backfillPipelineJob
description: |
Specifies mapping configuration.
required: false
properties:
- name: mappingConfig
type: NestedObject
description: |
The location of the mapping configuration.
required: true
properties:
- name: description
type: String
description: |
Describes the mapping configuration.
required: false
- name: whistleConfigSource
type: NestedObject
description: |
Specifies the path to the mapping configuration for harmonization pipeline.
required: false
properties:
- name: uri
type: String
description: |
Main configuration file which has the entrypoint or the root function.
Example: gs://{bucket-id}/{path/to/import-root/dir}/entrypoint-file-name.wstl.
required: true
- name: importUriPrefix
type: String
description: |
Directory path where all the Whistle files are located.
Example: gs://{bucket-id}/{path/to/import-root/dir}
required: true
- name: fhirStreamingSource
description: |
A streaming FHIR data source.
required: false
type: NestedObject
properties:
- name: fhirStore
type: String
description: |
The path to the FHIR store in the format projects/{projectId}/locations/{locationId}/datasets/{datasetId}/fhirStores/{fhirStoreId}.
required: true
- name: description
type: String
description: |
Describes the streaming FHIR data source.
required: false
- name: fhirStoreDestination
type: String
conflicts:
- reconciliationDestination
description: |
If set, the mapping pipeline will write snapshots to this
FHIR store without assigning stable IDs. You must
grant your pipeline project's Cloud Healthcare Service
Agent serviceaccount healthcare.fhirResources.executeBundle
and healthcare.fhirResources.create permissions on the
destination store. The destination store must set
[disableReferentialIntegrity][FhirStore.disable_referential_integrity]
to true. The destination store must use FHIR version R4.
Format: project/{projectID}/locations/{locationID}/datasets/{datasetName}/fhirStores/{fhirStoreID}.
required: false
- name: reconciliationDestination
type: Boolean
conflicts:
- fhirStoreDestination
description: |
If set to true, a mapping pipeline will send output snapshots
to the reconciliation pipeline in its dataset. A reconciliation
pipeline must exist in this dataset before a mapping pipeline
with a reconciliation destination can be created.
required: false
- name: reconciliationPipelineJob
conflicts:
- mappingPipelineJob
- backfillPipelineJob
description: |
Specifies reconciliation configuration.
required: false
type: NestedObject
properties:
- name: mergeConfig
description: |
Specifies the location of the reconciliation configuration.
required: true
type: NestedObject
properties:
- name: description
type: String
description: |
Describes the mapping configuration.
required: false
- name: whistleConfigSource
description: |
Specifies the path to the mapping configuration for harmonization pipeline.
required: true
type: NestedObject
properties:
- name: uri
type: String
description: |
Main configuration file which has the entrypoint or the root function.
Example: gs://{bucket-id}/{path/to/import-root/dir}/entrypoint-file-name.wstl.
required: true
- name: importUriPrefix
type: String
description: |
Directory path where all the Whistle files are located.
Example: gs://{bucket-id}/{path/to/import-root/dir}
required: true
- name: matchingUriPrefix
type: String
description: |
Specifies the top level directory of the matching configs used
in all mapping pipelines, which extract properties for resources
to be matched on.
Example: gs://{bucket-id}/{path/to/matching/configs}
required: true
- name: fhirStoreDestination
type: String
description: |
The harmonized FHIR store to write harmonized FHIR resources to,
in the format of: project/{projectID}/locations/{locationID}/datasets/{datasetName}/fhirStores/{id}
required: false
- name: backfillPipelineJob
conflicts:
- mappingPipelineJob
- reconciliationPipelineJob
description: |
Specifies the backfill configuration.
required: false
type: NestedObject
properties:
- name: mappingPipelineJob
type: String
description: |
Specifies the mapping pipeline job to backfill, the name format
should follow: projects/{projectId}/locations/{locationId}/datasets/{datasetId}/pipelineJobs/{pipelineJobId}.
required: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
resource "google_healthcare_pipeline_job" "{{$.PrimaryResourceId}}" {
name = "{{index $.Vars "backfill_pipeline_name"}}"
location = "us-central1"
dataset = google_healthcare_dataset.dataset.id
backfill_pipeline_job {
mapping_pipeline_job = "${google_healthcare_dataset.dataset.id}/pipelinejobs/{{index $.Vars "mapping_pipeline_name"}}"
}
}

resource "google_healthcare_dataset" "dataset" {
name = "{{index $.Vars "dataset_name"}}"
location = "us-central1"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
data "google_project" "project" {
}

resource "google_healthcare_pipeline_job" "recon" {
name = "{{index $.Vars "recon_pipeline_name"}}"
location = "us-central1"
dataset = google_healthcare_dataset.dataset.id
disable_lineage = true
reconciliation_pipeline_job {
merge_config {
description = "sample description for reconciliation rules"
whistle_config_source {
uri = "gs://${google_storage_bucket.bucket.name}/${google_storage_bucket_object.merge_file.name}"
import_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
}
}
matching_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
fhir_store_destination = "${google_healthcare_dataset.dataset.id}/fhirStores/${google_healthcare_fhir_store.dest_fhirstore.name}"
}
}

resource "google_healthcare_pipeline_job" "{{$.PrimaryResourceId}}" {
depends_on = [google_healthcare_pipeline_job.recon]
name = "{{index $.Vars "pipeline_name"}}"
location = "us-central1"
dataset = google_healthcare_dataset.dataset.id
disable_lineage = true
labels = {
example_label_key = "example_label_value"
}
mapping_pipeline_job {
mapping_config {
whistle_config_source {
uri = "gs://${google_storage_bucket.bucket.name}/${google_storage_bucket_object.mapping_file.name}"
import_uri_prefix = "gs://${google_storage_bucket.bucket.name}"
}
description = "example description for mapping configuration"
}
fhir_streaming_source {
fhir_store = "${google_healthcare_dataset.dataset.id}/fhirStores/${google_healthcare_fhir_store.source_fhirstore.name}"
description = "example description for streaming fhirstore"
}
reconciliation_destination = true
}
}

resource "google_healthcare_dataset" "dataset" {
name = "{{index $.Vars "dataset_name"}}"
location = "us-central1"
}

resource "google_healthcare_fhir_store" "source_fhirstore" {
name = "{{index $.Vars "source_fhirstore_name"}}"
dataset = google_healthcare_dataset.dataset.id
version = "R4"
enable_update_create = true
disable_referential_integrity = true
}

resource "google_healthcare_fhir_store" "dest_fhirstore" {
name = "{{index $.Vars "dest_fhirstore_name"}}"
dataset = google_healthcare_dataset.dataset.id
version = "R4"
enable_update_create = true
disable_referential_integrity = true
}

resource "google_storage_bucket" "bucket" {
name = "{{index $.Vars "bucket_name"}}"
location = "us-central1"
uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "mapping_file" {
name = "mapping.wstl"
content = " "
bucket = google_storage_bucket.bucket.name
}

resource "google_storage_bucket_object" "merge_file" {
name = "merge.wstl"
content = " "
bucket = google_storage_bucket.bucket.name
}

resource "google_storage_bucket_iam_member" "hsa" {
bucket = google_storage_bucket.bucket.name
role = "roles/storage.objectUser"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-healthcare.iam.gserviceaccount.com"
}
Loading

0 comments on commit ee0b5d1

Please sign in to comment.