Skip to content

Commit

Permalink
Feature/docref additions (#41)
Browse files Browse the repository at this point in the history
* bump g3t

* bump g3t

* bump g3t

* bump g3t

* bump g3t

* remove fhir store references

* first draft of error logs

* for etl pod testing

* improve jsonschemagraph erroring out

* manage jsonschemagraph exception

* update iceberg in dockerfile

* Revert "for etl pod testing"

This reverts commit b875077.

* revert testing change

* deprecate new creation of snapshots using push; keep existing getter for snapshots

* force error on jsonschema and return useful validation logs

* raise if jsonschemagraph fails

* remove unused getter making use of fhir index snapshots

* clearer error checking

* see if we get logs returned back or not from this

* remove

* try sower output formatting fix

* fix all exception to send logs back to client

* send permissions error back to client

---------

Co-authored-by: quinnwai <[email protected]>
  • Loading branch information
matthewpeterkort and quinnwai authored Nov 21, 2024
1 parent 77756ee commit 434d141
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 67 deletions.
4 changes: 2 additions & 2 deletions etl-job/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ RUN source venv/bin/activate && pip install -r requirements.txt

# move these out of requirements.txt, so that builds are quicker when we change
RUN source venv/bin/activate && pip install "iceberg_tools>=0.0.8rc3"
RUN source venv/bin/activate && pip install "gen3_tracker==0.0.5rc8"
RUN source venv/bin/activate && pip install "gen3_tracker==0.0.5rc13"
RUN source venv/bin/activate && pip install "aced-submission==0.0.9rc37"

RUN git clone https://github.com/bmeg/iceberg.git && \
cd iceberg && \
git checkout feature/FHIR-resource-type
git checkout feature/substance-and-group

#Add jsonschemagraph exe to image
RUN wget https://github.com/bmeg/jsonschemagraph/releases/download/v0.0.2/jsonschemagraph-linux.amd64 -P /usr/local/bin/
Expand Down
115 changes: 50 additions & 65 deletions etl-job/fhir_import_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import yaml
from datetime import datetime

from aced_submission.fhir_store import fhir_get, fhir_put, fhir_delete
from aced_submission.meta_flat_load import DEFAULT_ELASTIC, load_flat
from aced_submission.meta_flat_load import delete as meta_flat_delete
from aced_submission.grip_load import bulk_load, get_project_data, \
Expand Down Expand Up @@ -165,6 +164,7 @@ def _download_and_unzip(object_id: str,
except Exception as e:
output['logs'].append(f"An Exception Occurred: {str(e)}")
output['logs'].append(f"ERROR DOWNLOADING {object_id} {file_path}")
_write_output_to_client(output)
raise e
return False

Expand Down Expand Up @@ -213,7 +213,10 @@ def _load_all(study: str,
extraction_path = str(extraction_path)
output['logs'].append(f"Simplifying study: {file_path}")

subprocess.run(["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{file_path}", f"{extraction_path}","--project_id", f"{project_id}","--gzip_files"])
# call jsonschemagraph to create edges and vertices
graph_gen_cmd = ["jsonschemagraph", "gen-dir", "iceberg/schemas/graph", f"{file_path}", f"{extraction_path}","--project_id", f"{project_id}","--gzip_files"]
subprocess.run(graph_gen_cmd, check=True, capture_output=True)

bulk_load(_get_grip_service(), "CALIPER",f"{program}-{project}", extraction_path, output, _get_token())

assert pathlib.Path(work_path).exists(), f"Directory {work_path} does not exist."
Expand All @@ -240,10 +243,26 @@ def _load_all(study: str,
limit=None, elastic_url=DEFAULT_ELASTIC,
output_path=None)

logs = fhir_put(project_id, path=file_path,
elastic_url=DEFAULT_ELASTIC)
yaml.dump(logs, sys.stdout, default_flow_style=False)

# when generating graph with jsonschemagraph
except subprocess.CalledProcessError as exception:
# save and print any useful logs
tb = traceback.print_tb(exception.__traceback__)
for title, log in [("stdout", exception.stdout), ("traceback", tb), ("ERROR", exception.stderr)]:
if not log:
continue

message = f"{title.upper()}: {log}"
output['logs'].append(message)
_write_output_to_client(output)
print(message)

# print final error before raising
final_error = f"ERROR: Unable to generate valid jsonschema graph from {file_path} to {extraction_path} for project ID {project_id}"
output['logs'].append(final_error)
_write_output_to_client(output)
raise

# when making changes to Elasticsearch
except OpenSearchException as e:
output['logs'].append(f"An ElasticSearch Exception occurred: {str(e)}")
tb = traceback.format_exc()
Expand All @@ -252,8 +271,10 @@ def _load_all(study: str,
output['logs'].append(tb)
if logs is not None:
output['logs'].extend(logs)
return False
_write_output_to_client(output)
raise

# all other exceptions
except Exception as e:
output['logs'].append(f"An Exception Occurred: {str(e)}")
tb = traceback.format_exc()
Expand All @@ -262,54 +283,15 @@ def _load_all(study: str,
output['logs'].append(tb)
if logs is not None:
output['logs'].extend(logs)
return False
_write_output_to_client(output)
raise

output['logs'].append(f"Loaded {study}")
if logs is not None:
output['logs'].extend(logs)
return True


def _get(output: dict,
program: str,
project: str,
user: dict,
auth: Gen3Auth) -> str | None:
"""Export data from the fhir store to bucket, returns object_id."""
can_read = _can_read(output, program, project, user)
if not can_read:
output['logs'].append(f"No read permissions on {program}-{project}")
return None

study_path = f"studies/{project}"
project_id = f"{program}-{project}"

# ensure we wait for the index to be refreshed before we query it
elastic = Elasticsearch([DEFAULT_ELASTIC], request_timeout=120)
elastic.indices.refresh(index='fhir')

logs = fhir_get(f"{program}-{project}", study_path, DEFAULT_ELASTIC)
output['logs'].extend(logs)


# zip and upload the exported files to bucket
now = datetime.now().strftime("%Y%m%d-%H%M%S")
object_name = f'{project_id}_{now}_SNAPSHOT.zip'

config = Config()
cp_result = push_snapshot(
config=config,
auth=auth,
project_id=project_id,
from_=study_path,
object_name=object_name)

output['logs'].append(cp_result['msg'])
object_id = cp_result['object_id']

return object_id


def _empty_project(output: dict,
program: str,
project: str,
Expand All @@ -327,13 +309,12 @@ def _empty_project(output: dict,
meta_flat_delete(project_id=f"{program}-{project}", index=index)
output['logs'].append(f"EMPTIED flat for {program}-{project}")

fhir_delete(f"{program}-{project}", DEFAULT_ELASTIC)
output['logs'].append(f"EMPTIED FHIR STORE for {program}-{project}")

except Exception as e:
output['logs'].append(f"An Exception Occurred emptying project {program}-{project}: {str(e)}")
tb = traceback.format_exc()
output['logs'].append(tb)
_write_output_to_client(output)
raise


def main():
Expand All @@ -348,9 +329,9 @@ def main():
# note, only the last output (a line in stdout with `[out]` prefix) is returned to the caller

# output['env'] = {k: v for k, v in os.environ.items()}

input_data = _input_data()
print(f"[out] {json.dumps(input_data, separators=(',', ':'))}")
_write_output_to_client(input_data)
program, project = _get_program_project(input_data)

schema = os.getenv('DICTIONARY_URL', None)
Expand All @@ -361,25 +342,18 @@ def main():

method = input_data.get("method", None)
assert method, "input data must contain a `method`"

if method.lower() == 'put':
# read from bucket, write to fhir store
_put(input_data, output, program, project, user, schema)
# after pushing commits, create a snapshot file
object_id = _get(output, program, project, user, auth)
output['snapshot'] = {'object_id': object_id}
elif method.lower() == 'get':
# read fhir store, write to bucket
object_id = _get(output, program, project, user, auth)
output['object_id'] = object_id
elif method.lower() == 'delete':
_empty_project(output, program, project, user, dictionary_path=schema,
config_path="config.yaml")

config_path="config.yaml")
else:
raise Exception(f"unknown method {method}")

# note, only the last output (a line in stdout with `[out]` prefix) is returned to the caller
print(f"[out] {json.dumps(output, separators=(',', ':'))}")
_write_output_to_client(output)


def _put(input_data: dict,
Expand All @@ -392,7 +366,12 @@ def _put(input_data: dict,
# check permissions
can_create = _can_create(output, program, project, user)
output['logs'].append(f"CAN CREATE: {can_create}")
assert can_create, f"No create permissions on {program}"
if not can_create:
error_log = f"ERROR 401: No permissions to create project {project} on program {program}. \nYou can view your project-level permissions with g3t ping"
output["logs"].append(error_log)
_write_output_to_client(output)
raise Exception(error_log)

assert 'push' in input_data, "input data must contain a `push`"
for commit in input_data['push']['commits']:
assert 'object_id' in commit, "commit must contain an `object_id`"
Expand All @@ -412,9 +391,15 @@ def _put(input_data: dict,

# load the study into the database and elastic search
_load_all(project, f"{program}-{project}", output, file_path, schema, "work")

shutil.rmtree(f"/root/studies/{project}")

def _write_output_to_client(output):
'''
formats output as json to stdout so it is passed back to the client,
most importantly to display relevant logs from the job erroring out
'''
print(f"[out] {json.dumps(output, separators=(',', ':'))}")

if __name__ == '__main__':
main()

0 comments on commit 434d141

Please sign in to comment.