Skip to content

Commit

Permalink
bugfix / simplify fhir import job
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewpeterkort committed Nov 21, 2024
1 parent 7b87ed7 commit 483d426
Showing 1 changed file with 9 additions and 40 deletions.
49 changes: 9 additions & 40 deletions etl-job/fhir_import_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import subprocess
import sys
import traceback
import requests
import yaml
from datetime import datetime

Expand All @@ -19,7 +18,6 @@
from gen3.auth import Gen3Auth
from gen3.file import Gen3File
from gen3_tracker.config import Config
from gen3_tracker.git.snapshotter import push_snapshot
from gen3_tracker.meta.dataframer import LocalFHIRDatabase

logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
Expand Down Expand Up @@ -160,7 +158,7 @@ def _download_and_unzip(object_id: str,
full_download_path = (pathlib.Path('downloads') / file_name)
full_download_path_parent = full_download_path.parent
full_download_path_parent.mkdir(parents=True, exist_ok=True)
file_client.download_single(object_id, 'downloads' )
file_client.download_single(object_id, 'downloads')
except Exception as e:
output['logs'].append(f"An Exception Occurred: {str(e)}")
output['logs'].append(f"ERROR DOWNLOADING {object_id} {file_path}")
Expand All @@ -184,38 +182,17 @@ def _download_and_unzip(object_id: str,
return True


def _load_all(study: str,
project_id: str,
def _load_all(program: str,
project: str,
output: dict,
file_path: str,
schema: str,
work_path: str) -> bool:

if study is None or study == "":
output['logs'].append("Please provide a study name")
return False

if project_id is None or project_id == "":
output['logs'].append("Please provide a project_id (program-project)")
return False

logs = None
try:
program, project = project_id.split('-')
assert program, output['logs'].append("program is required")
assert project, output['logs'].append("project is required")

file_path = pathlib.Path(file_path)
extraction_path = file_path / 'extractions'
research_study = str(extraction_path / 'ResearchStudy.ndjson')

file_path = str(file_path)
extraction_path = str(extraction_path)
output['logs'].append(f"Simplifying study: {file_path}")

importable_files = [f for f in os.listdir(extraction_path) if f.endswith(".ndjson")]
importable_files = [f for f in os.listdir(file_path) if f.endswith(".ndjson")]
for file in importable_files:
# output capturing logs from this function
# output dictionary is capturing logs from this function
bulk_load_raw(_get_grip_service(), "CALIPER", f"{program}-{project}", file, output, _get_token())

assert pathlib.Path(work_path).exists(), f"Directory {work_path} does not exist."
Expand Down Expand Up @@ -276,7 +253,6 @@ def _empty_project(output: dict,
program: str,
project: str,
user: dict,
dictionary_path: str | None = None,
config_path: str | None = None):
"""Clear out graph and flat metadata for project """
# check permissions
Expand Down Expand Up @@ -314,20 +290,14 @@ def main():
_write_output_to_client(input_data)
program, project = _get_program_project(input_data)

schema = os.getenv('DICTIONARY_URL', None)

if schema is None:
schema = 'https://raw.githubusercontent.com/bmeg/iceberg/refs/heads/main/schemas/graph/graph-fhir.json'
output['logs'].append(f"DICTIONARY_URL not found in environment using {schema}")

method = input_data.get("method", None)
assert method, "input data must contain a `method`"

if method.lower() == 'put':
# read from bucket, write to fhir store
_put(input_data, output, program, project, user, schema)
_put(input_data, output, program, project, user)
elif method.lower() == 'delete':
_empty_project(output, program, project, user, dictionary_path=schema,
_empty_project(output, program, project, user,
config_path="config.yaml")
else:
raise Exception(f"unknown method {method}")
Expand All @@ -340,8 +310,7 @@ def _put(input_data: dict,
output: dict,
program: str,
project: str,
user: dict,
schema: str):
user: dict):
"""Import data from bucket to graph, flat and fhir store."""
# check permissions
can_create = _can_create(output, program, project, user)
Expand Down Expand Up @@ -370,7 +339,7 @@ def _put(input_data: dict,
output['files'].append(str(_))

# load the study into the database and elastic search
_load_all(project, f"{program}-{project}", output, file_path, schema, "work")
_load_all(program, project, output, file_path, "work")

shutil.rmtree(f"/root/studies/{project}")

Expand Down

0 comments on commit 483d426

Please sign in to comment.