diff --git a/etl-job/Dockerfile b/etl-job/Dockerfile index 984080a..d0af004 100644 --- a/etl-job/Dockerfile +++ b/etl-job/Dockerfile @@ -46,12 +46,12 @@ RUN source venv/bin/activate && pip install -r requirements.txt # move these out of requirements.txt, so that builds are quicker when we change RUN source venv/bin/activate && pip install "iceberg_tools>=0.0.8rc3" -RUN source venv/bin/activate && pip install "gen3_tracker==0.0.5rc8" +RUN source venv/bin/activate && pip install "gen3_tracker==0.0.5rc13" RUN source venv/bin/activate && pip install "aced-submission==0.0.9rc37" RUN git clone https://github.com/bmeg/iceberg.git && \ cd iceberg && \ - git checkout feature/FHIR-resource-type + git checkout feature/substance-and-group #Add jsonschemagraph exe to image RUN wget https://github.com/bmeg/jsonschemagraph/releases/download/v0.0.2/jsonschemagraph-linux.amd64 -P /usr/local/bin/ diff --git a/etl-job/fhir_import_export.py b/etl-job/fhir_import_export.py index 328b229..5d2648c 100644 --- a/etl-job/fhir_import_export.py +++ b/etl-job/fhir_import_export.py @@ -10,7 +10,6 @@ import yaml from datetime import datetime -from aced_submission.fhir_store import fhir_get, fhir_put, fhir_delete from aced_submission.meta_flat_load import DEFAULT_ELASTIC, load_flat from aced_submission.meta_flat_load import delete as meta_flat_delete from aced_submission.grip_load import bulk_load, get_project_data, \ @@ -165,6 +164,7 @@ def _download_and_unzip(object_id: str, except Exception as e: output['logs'].append(f"An Exception Occurred: {str(e)}") output['logs'].append(f"ERROR DOWNLOADING {object_id} {file_path}") + _write_output_to_client(output) raise e return False @@ -213,7 +213,10 @@ def _load_all(study: str, extraction_path = str(extraction_path) output['logs'].append(f"Simplifying study: {file_path}") - subprocess.run(["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{file_path}", f"{extraction_path}","--project_id", f"{project_id}","--gzip_files"]) + # call jsonschemagraph to create edges and vertices + graph_gen_cmd = ["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{file_path}", f"{extraction_path}","--project_id", f"{project_id}","--gzip_files"] + subprocess.run(graph_gen_cmd, check=True, capture_output=True) + bulk_load(_get_grip_service(), "CALIPER",f"{program}-{project}", extraction_path, output, _get_token()) assert pathlib.Path(work_path).exists(), f"Directory {work_path} does not exist." @@ -240,10 +243,26 @@ def _load_all(study: str, limit=None, elastic_url=DEFAULT_ELASTIC, output_path=None) - logs = fhir_put(project_id, path=file_path, - elastic_url=DEFAULT_ELASTIC) - yaml.dump(logs, sys.stdout, default_flow_style=False) - + # when generating graph with jsonschemagraph + except subprocess.CalledProcessError as exception: + # save and print any useful logs + tb = traceback.print_tb(exception.__traceback__) + for title, log in [("stdout", exception.stdout), ("traceback", tb), ("ERROR", exception.stderr)]: + if not log: + continue + + message = f"{title.upper()}: {log}" + output['logs'].append(message) + _write_output_to_client(output) + print(message) + + # print final error before raising + final_error = f"ERROR: Unable to generate valid jsonschema graph from {file_path} to {extraction_path} for project ID {project_id}" + output['logs'].append(final_error) + _write_output_to_client(output) + raise + + # when making changes to Elasticsearch except OpenSearchException as e: output['logs'].append(f"An ElasticSearch Exception occurred: {str(e)}") tb = traceback.format_exc() @@ -252,8 +271,10 @@ def _load_all(study: str, output['logs'].append(tb) if logs is not None: output['logs'].extend(logs) - return False + _write_output_to_client(output) + raise + # all other exceptions except Exception as e: output['logs'].append(f"An Exception Occurred: {str(e)}") tb = traceback.format_exc() @@ -262,7 +283,8 @@ def _load_all(study: str, output['logs'].append(tb) if logs is not None: output['logs'].extend(logs) - return False + _write_output_to_client(output) + raise output['logs'].append(f"Loaded {study}") if logs is not None: @@ -270,46 +292,6 @@ def _load_all(study: str, return True -def _get(output: dict, - program: str, - project: str, - user: dict, - auth: Gen3Auth) -> str | None: - """Export data from the fhir store to bucket, returns object_id.""" - can_read = _can_read(output, program, project, user) - if not can_read: - output['logs'].append(f"No read permissions on {program}-{project}") - return None - - study_path = f"studies/{project}" - project_id = f"{program}-{project}" - - # ensure we wait for the index to be refreshed before we query it - elastic = Elasticsearch([DEFAULT_ELASTIC], request_timeout=120) - elastic.indices.refresh(index='fhir') - - logs = fhir_get(f"{program}-{project}", study_path, DEFAULT_ELASTIC) - output['logs'].extend(logs) - - - # zip and upload the exported files to bucket - now = datetime.now().strftime("%Y%m%d-%H%M%S") - object_name = f'{project_id}_{now}_SNAPSHOT.zip' - - config = Config() - cp_result = push_snapshot( - config=config, - auth=auth, - project_id=project_id, - from_=study_path, - object_name=object_name) - - output['logs'].append(cp_result['msg']) - object_id = cp_result['object_id'] - - return object_id - - def _empty_project(output: dict, program: str, project: str, @@ -327,13 +309,12 @@ def _empty_project(output: dict, meta_flat_delete(project_id=f"{program}-{project}", index=index) output['logs'].append(f"EMPTIED flat for {program}-{project}") - fhir_delete(f"{program}-{project}", DEFAULT_ELASTIC) - output['logs'].append(f"EMPTIED FHIR STORE for {program}-{project}") - except Exception as e: output['logs'].append(f"An Exception Occurred emptying project {program}-{project}: {str(e)}") tb = traceback.format_exc() output['logs'].append(tb) + _write_output_to_client(output) + raise def main(): @@ -348,9 +329,9 @@ def main(): # note, only the last output (a line in stdout with `[out]` prefix) is returned to the caller # output['env'] = {k: v for k, v in os.environ.items()} - + input_data = _input_data() - print(f"[out] {json.dumps(input_data, separators=(',', ':'))}") + _write_output_to_client(input_data) program, project = _get_program_project(input_data) schema = os.getenv('DICTIONARY_URL', None) @@ -361,25 +342,18 @@ def main(): method = input_data.get("method", None) assert method, "input data must contain a `method`" + if method.lower() == 'put': # read from bucket, write to fhir store _put(input_data, output, program, project, user, schema) - # after pushing commits, create a snapshot file - object_id = _get(output, program, project, user, auth) - output['snapshot'] = {'object_id': object_id} - elif method.lower() == 'get': - # read fhir store, write to bucket - object_id = _get(output, program, project, user, auth) - output['object_id'] = object_id elif method.lower() == 'delete': _empty_project(output, program, project, user, dictionary_path=schema, - config_path="config.yaml") - + config_path="config.yaml") else: raise Exception(f"unknown method {method}") # note, only the last output (a line in stdout with `[out]` prefix) is returned to the caller - print(f"[out] {json.dumps(output, separators=(',', ':'))}") + _write_output_to_client(output) def _put(input_data: dict, @@ -392,7 +366,12 @@ def _put(input_data: dict, # check permissions can_create = _can_create(output, program, project, user) output['logs'].append(f"CAN CREATE: {can_create}") - assert can_create, f"No create permissions on {program}" + if not can_create: + error_log = f"ERROR 401: No permissions to create project {project} on program {program}. \nYou can view your project-level permissions with g3t ping" + output["logs"].append(error_log) + _write_output_to_client(output) + raise Exception(error_log) + assert 'push' in input_data, "input data must contain a `push`" for commit in input_data['push']['commits']: assert 'object_id' in commit, "commit must contain an `object_id`" @@ -412,9 +391,15 @@ def _put(input_data: dict, # load the study into the database and elastic search _load_all(project, f"{program}-{project}", output, file_path, schema, "work") - + shutil.rmtree(f"/root/studies/{project}") +def _write_output_to_client(output): + ''' + formats output as json to stdout so it is passed back to the client, + most importantly to display relevant logs from the job erroring out + ''' + print(f"[out] {json.dumps(output, separators=(',', ':'))}") if __name__ == '__main__': main()