diff --git a/etl-job/fhir_import_export.py b/etl-job/fhir_import_export.py index dc0a36d..c987bf8 100644 --- a/etl-job/fhir_import_export.py +++ b/etl-job/fhir_import_export.py @@ -6,7 +6,6 @@ import subprocess import sys import traceback -import requests import yaml from datetime import datetime @@ -19,7 +18,6 @@ from gen3.auth import Gen3Auth from gen3.file import Gen3File from gen3_tracker.config import Config -from gen3_tracker.git.snapshotter import push_snapshot from gen3_tracker.meta.dataframer import LocalFHIRDatabase logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) @@ -160,7 +158,7 @@ def _download_and_unzip(object_id: str, full_download_path = (pathlib.Path('downloads') / file_name) full_download_path_parent = full_download_path.parent full_download_path_parent.mkdir(parents=True, exist_ok=True) - file_client.download_single(object_id, 'downloads' ) + file_client.download_single(object_id, 'downloads') except Exception as e: output['logs'].append(f"An Exception Occurred: {str(e)}") output['logs'].append(f"ERROR DOWNLOADING {object_id} {file_path}") @@ -184,38 +182,17 @@ def _download_and_unzip(object_id: str, return True -def _load_all(study: str, - project_id: str, +def _load_all(program: str, + project: str, output: dict, file_path: str, - schema: str, work_path: str) -> bool: - if study is None or study == "": - output['logs'].append("Please provide a study name") - return False - - if project_id is None or project_id == "": - output['logs'].append("Please provide a project_id (program-project)") - return False - logs = None try: - program, project = project_id.split('-') - assert program, output['logs'].append("program is required") - assert project, output['logs'].append("project is required") - - file_path = pathlib.Path(file_path) - extraction_path = file_path / 'extractions' - research_study = str(extraction_path / 'ResearchStudy.ndjson') - - file_path = str(file_path) - extraction_path = str(extraction_path) - output['logs'].append(f"Simplifying study: {file_path}") - - importable_files = [f for f in os.listdir(extraction_path) if f.endswith(".ndjson")] + importable_files = [f for f in os.listdir(file_path) if f.endswith(".ndjson")] for file in importable_files: - # output capturing logs from this function + # output dictionary is capturing logs from this function bulk_load_raw(_get_grip_service(), "CALIPER", f"{program}-{project}", file, output, _get_token()) assert pathlib.Path(work_path).exists(), f"Directory {work_path} does not exist." @@ -276,7 +253,6 @@ def _empty_project(output: dict, program: str, project: str, user: dict, - dictionary_path: str | None = None, config_path: str | None = None): """Clear out graph and flat metadata for project """ # check permissions @@ -314,20 +290,14 @@ def main(): _write_output_to_client(input_data) program, project = _get_program_project(input_data) - schema = os.getenv('DICTIONARY_URL', None) - - if schema is None: - schema = 'https://raw.githubusercontent.com/bmeg/iceberg/refs/heads/main/schemas/graph/graph-fhir.json' - output['logs'].append(f"DICTIONARY_URL not found in environment using {schema}") - method = input_data.get("method", None) assert method, "input data must contain a `method`" if method.lower() == 'put': # read from bucket, write to fhir store - _put(input_data, output, program, project, user, schema) + _put(input_data, output, program, project, user) elif method.lower() == 'delete': - _empty_project(output, program, project, user, dictionary_path=schema, + _empty_project(output, program, project, user, config_path="config.yaml") else: raise Exception(f"unknown method {method}") @@ -340,8 +310,7 @@ def _put(input_data: dict, output: dict, program: str, project: str, - user: dict, - schema: str): + user: dict): """Import data from bucket to graph, flat and fhir store.""" # check permissions can_create = _can_create(output, program, project, user) @@ -370,7 +339,7 @@ def _put(input_data: dict, output['files'].append(str(_)) # load the study into the database and elastic search - _load_all(project, f"{program}-{project}", output, file_path, schema, "work") + _load_all(program, project, output, file_path, "work") shutil.rmtree(f"/root/studies/{project}")