diff --git a/cli/src/pixl_cli/_io.py b/cli/src/pixl_cli/_io.py new file mode 100644 index 000000000..5ba25f378 --- /dev/null +++ b/cli/src/pixl_cli/_io.py @@ -0,0 +1,145 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Reading and writing files from PIXL CLI.""" +import json +from datetime import datetime +from pathlib import Path + +import pandas as pd +from core.omop import OmopExtract +from core.patient_queue.message import Message, deserialise + +from pixl_cli._logging import logger +from pixl_cli._utils import string_is_non_empty + +# instance of omop extract, can be overriden during testing +extract = OmopExtract() + + +def messages_from_state_file(filepath: Path) -> list[Message]: + """ + Return messages from a state file path + + :param filepath: Path for state file to be read + :return: A list of Message objects containing all the messages from the state file + """ + logger.info(f"Creating messages from {filepath}") + if not filepath.exists(): + raise FileNotFoundError + if filepath.suffix != ".state": + msg = f"Invalid file suffix for {filepath}. Expected .state" + raise ValueError(msg) + + return [ + deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line) + ] + + +def copy_parquet_return_logfile_fields(parquet_path: Path) -> tuple[str, datetime]: + """Copy public parquet file to extracts directory, and return fields from logfile""" + log_file = parquet_path / "extract_summary.json" + + logs = json.load(log_file.open()) + project_name = logs["settings"]["cdm_source_name"] + omop_es_timestamp = datetime.fromisoformat(logs["datetime"]) + project_name_slug = extract.copy_to_exports(parquet_path, project_name, omop_es_timestamp) + return project_name_slug, omop_es_timestamp + + +def messages_from_parquet( + dir_path: Path, project_name: str, omop_es_timestamp: datetime +) -> list[Message]: + """ + Reads patient information from parquet files within directory structure + and transforms that into messages. + + :param dir_path: Path for parquet directory containing private and public + :param project_name: Name of the project, should be a slug, so it can match the export directory + :param omop_es_timestamp: Datetime that OMOP ES ran the extract + files + """ + public_dir = dir_path / "public" + private_dir = dir_path / "private" + + cohort_data = _check_and_parse_parquet(private_dir, public_dir) + + expected_col_names = [ + "PrimaryMrn", + "AccessionNumber", + "person_id", + "procedure_date", + "procedure_occurrence_id", + ] + _raise_if_column_names_not_found(cohort_data, expected_col_names) + + ( + mrn_col_name, + acc_num_col_name, + _, + dt_col_name, + procedure_occurrence_id, + ) = expected_col_names + + messages = [] + + for _, row in cohort_data.iterrows(): + message = Message( + mrn=row[mrn_col_name], + accession_number=row[acc_num_col_name], + study_date=row[dt_col_name], + procedure_occurrence_id=row[procedure_occurrence_id], + project_name=project_name, + omop_es_timestamp=omop_es_timestamp, + ) + messages.append(message) + + if len(messages) == 0: + msg = f"Failed to find any messages in {dir_path}" + raise ValueError(msg) + + logger.info(f"Created {len(messages)} messages from {dir_path}") + return messages + + +def _check_and_parse_parquet(private_dir: Path, public_dir: Path) -> pd.DataFrame: + for d in [public_dir, private_dir]: + if not d.is_dir(): + err_str = f"{d} must exist and be a directory" + raise NotADirectoryError(err_str) + + # MRN in people.PrimaryMrn: + people = pd.read_parquet(private_dir / "PERSON_LINKS.parquet") + # accession number in accessions.AccessionNumber + accessions = pd.read_parquet(private_dir / "PROCEDURE_OCCURRENCE_LINKS.parquet") + # study_date is in procedure.procedure_date + procedure = pd.read_parquet(public_dir / "PROCEDURE_OCCURRENCE.parquet") + # joining data together + people_procedures = people.merge(procedure, on="person_id") + return people_procedures.merge(accessions, on="procedure_occurrence_id") + + +def _raise_if_column_names_not_found( + cohort_data: pd.DataFrame, expected_col_names: list[str] +) -> None: + logger.debug( + f"Checking merged parquet files. Expecting columns to include {expected_col_names}" + ) + for col in expected_col_names: + if col not in list(cohort_data.columns): + msg = ( + f"parquet files are expected to have at least {expected_col_names} as " + f"column names" + ) + raise ValueError(msg) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 75763b8b2..5223e33b3 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -13,7 +13,6 @@ # limitations under the License. """PIXL command line interface functionality""" -import datetime import json import os from operator import attrgetter @@ -21,15 +20,14 @@ from typing import Any, Optional import click -import pandas as pd import requests import yaml -from core.patient_queue.message import Message, deserialise from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer +from ._io import copy_parquet_return_logfile_fields, messages_from_parquet, messages_from_state_file from ._logging import logger, set_log_level -from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty +from ._utils import clear_file, remove_file_if_it_exists def _load_config(filename: str = "pixl_config.yml") -> dict: @@ -90,18 +88,20 @@ def populate(parquet_dir: Path, *, restart: bool, queues: str) -> None: └── extract_summary.json """ logger.info(f"Populating queue(s) {queues} from {parquet_dir}") + project_name, omop_es_datetime = copy_parquet_return_logfile_fields(parquet_dir) + messages = messages_from_parquet(parquet_dir, project_name, omop_es_datetime) + for queue in queues.split(","): - with PixlProducer(queue_name=queue, **config["rabbitmq"]) as producer: - state_filepath = state_filepath_for_queue(queue) - if state_filepath.exists() and restart: - logger.info(f"Extracting messages from state: {state_filepath}") - inform_user_that_queue_will_be_populated_from(state_filepath) - messages = messages_from_state_file(state_filepath) - elif parquet_dir is not None: - messages = messages_from_parquet(parquet_dir) + state_filepath = state_filepath_for_queue(queue) + if state_filepath.exists() and restart: + logger.info(f"Extracting messages from state: {state_filepath}") + inform_user_that_queue_will_be_populated_from(state_filepath) + messages = messages_from_state_file(state_filepath) - remove_file_if_it_exists(state_filepath) # will be stale - producer.publish(sorted(messages, key=attrgetter("study_datetime"))) + remove_file_if_it_exists(state_filepath) # will be stale + + with PixlProducer(queue_name=queue, **config["rabbitmq"]) as producer: + producer.publish(sorted(messages, key=attrgetter("study_date"))) @cli.command() @@ -285,110 +285,6 @@ def state_filepath_for_queue(queue_name: str) -> Path: return Path(f"{queue_name.replace('/', '_')}.state") -def messages_from_state_file(filepath: Path) -> list[Message]: - """ - Return messages from a state file path - - :param filepath: Path for state file to be read - :return: A list of Message objects containing all the messages from the state file - """ - logger.info(f"Creating messages from {filepath}") - if not filepath.exists(): - raise FileNotFoundError - if filepath.suffix != ".state": - msg = f"Invalid file suffix for {filepath}. Expected .state" - raise ValueError(msg) - - return [ - deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line) - ] - - -def messages_from_parquet(dir_path: Path) -> list[Message]: - """ - Reads patient information from parquet files within directory structure - and transforms that into messages. - :param dir_path: Path for parquet directory containing private and public - files - """ - public_dir = dir_path / "public" - private_dir = dir_path / "private" - log_file = dir_path / "extract_summary.json" - - for d in [public_dir, private_dir]: - if not d.is_dir(): - err_str = f"{d} must exist and be a directory" - raise NotADirectoryError(err_str) - - if not log_file.is_file(): - err_str = f"{log_file} must exist and be a file" - raise FileNotFoundError(err_str) - - # MRN in people.PrimaryMrn: - people = pd.read_parquet(private_dir / "PERSON_LINKS.parquet") - # accession number in accessions.AccesionNumber - accessions = pd.read_parquet(private_dir / "PROCEDURE_OCCURRENCE_LINKS.parquet") - # study_date is in procedure.procdure_date - procedure = pd.read_parquet(public_dir / "PROCEDURE_OCCURRENCE.parquet") - # joining data together - people_procedures = people.merge(procedure, on="person_id") - cohort_data = people_procedures.merge(accessions, on="procedure_occurrence_id") - - expected_col_names = [ - "PrimaryMrn", - "AccessionNumber", - "person_id", - "procedure_date", - "procedure_occurrence_id", - ] - logger.debug( - f"Extracting messages from {dir_path}. Expecting columns to include " - f"{expected_col_names}" - ) - - for col in expected_col_names: - if col not in list(cohort_data.columns): - msg = ( - f"parquet files are expected to have at least {expected_col_names} as " - f"column names" - ) - raise ValueError(msg) - - ( - mrn_col_name, - acc_num_col_name, - _, - dt_col_name, - procedure_occurrence_id, - ) = expected_col_names - - # Get project name and OMOP ES timestamp from log file - logs = json.load(log_file.open()) - project_name = logs["settings"]["cdm_source_name"] - omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"]) - - messages = [] - - for _, row in cohort_data.iterrows(): - # Create new dict to initialise message - message = Message( - mrn=row[mrn_col_name], - accession_number=row[acc_num_col_name], - study_datetime=row[dt_col_name], - procedure_occurrence_id=row[procedure_occurrence_id], - project_name=project_name, - omop_es_timestamp=omop_es_timestamp, - ) - messages.append(message) - - if len(messages) == 0: - msg = f"Failed to find any messages in {dir_path}" - raise ValueError(msg) - - logger.info(f"Created {len(messages)} messages from {dir_path}") - return messages - - def queue_is_up() -> Any: """Checks if the queue is up""" with PixlProducer(queue_name="") as producer: diff --git a/cli/tests/conftest.py b/cli/tests/conftest.py index 254115b38..9b39fff49 100644 --- a/cli/tests/conftest.py +++ b/cli/tests/conftest.py @@ -19,11 +19,17 @@ from core.omop import OmopExtract -@pytest.fixture() -def omop_files(tmp_path_factory: pytest.TempPathFactory) -> OmopExtract: - """Create an OmopExtract instance using a temporary directory""" +@pytest.fixture(autouse=True) +def omop_files(tmp_path_factory: pytest.TempPathFactory, monkeypatch) -> OmopExtract: + """ + Replace production extract instance with one writing to a tmpdir. + + :returns OmopExtract: For direct use when the fixture is explicity called. + """ export_dir = tmp_path_factory.mktemp("repo_base") - return OmopExtract(export_dir) + tmpdir_extract = OmopExtract(export_dir) + monkeypatch.setattr("pixl_cli._io.extract", tmpdir_extract) + return tmpdir_extract @pytest.fixture() diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index c092cc489..b944b84e0 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -17,49 +17,54 @@ from pathlib import Path from core.patient_queue.message import Message -from pixl_cli.main import messages_from_parquet +from pixl_cli._io import copy_parquet_return_logfile_fields, messages_from_parquet def test_messages_from_parquet(resources: Path) -> None: """ - Test that the messages are as expected, given the test parquet files. - The test data doesn't have any "difficult" cases in it, eg. people without procedures. + Given a valid OMOP ES extract directory that has had the logfile parsed + When the messages are generated from the directory and the output of logfile parsing + Then the messages should match expected values """ + # Arrange omop_parquet_dir = resources / "omop" - messages = messages_from_parquet(omop_parquet_dir) + project_name, omop_es_datetime = copy_parquet_return_logfile_fields(omop_parquet_dir) + # Act + messages = messages_from_parquet(omop_parquet_dir, project_name, omop_es_datetime) + # Assert assert all(isinstance(msg, Message) for msg in messages) expected_messages = [ Message( mrn="12345678", accession_number="12345678", - study_datetime=datetime.date.fromisoformat("2021-07-01"), + study_date=datetime.date.fromisoformat("2021-07-01"), procedure_occurrence_id=1, - project_name="Test Extract - UCLH OMOP CDM", + project_name="test-extract-uclh-omop-cdm", omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), Message( mrn="12345678", accession_number="ABC1234567", - study_datetime=datetime.date.fromisoformat("2021-07-01"), + study_date=datetime.date.fromisoformat("2021-07-01"), procedure_occurrence_id=2, - project_name="Test Extract - UCLH OMOP CDM", + project_name="test-extract-uclh-omop-cdm", omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), Message( mrn="987654321", accession_number="ABC1234560", - study_datetime=datetime.date.fromisoformat("2020-05-01"), + study_date=datetime.date.fromisoformat("2020-05-01"), procedure_occurrence_id=3, - project_name="Test Extract - UCLH OMOP CDM", + project_name="test-extract-uclh-omop-cdm", omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), Message( mrn="5020765", accession_number="MIG0234560", - study_datetime=datetime.date.fromisoformat("2015-05-01"), + study_date=datetime.date.fromisoformat("2015-05-01"), procedure_occurrence_id=4, - project_name="Test Extract - UCLH OMOP CDM", + project_name="test-extract-uclh-omop-cdm", omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), ] diff --git a/exports/.gitkeep b/exports/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/pixl_core/src/core/omop.py b/pixl_core/src/core/omop.py index 85d227b5e..e8230fc20 100644 --- a/pixl_core/src/core/omop.py +++ b/pixl_core/src/core/omop.py @@ -14,6 +14,7 @@ """Processing of OMOP parquet files.""" import datetime +import logging import pathlib import shutil @@ -21,6 +22,8 @@ root_from_install = pathlib.Path(__file__).parents[3] +logger = logging.getLogger(__file__) + class OmopExtract: """Processing Omop extracts on the filesystem.""" @@ -63,6 +66,7 @@ def copy_to_exports( public_output = OmopExtract._mkdir( export_base / "all_extracts" / "omop" / extract_time_slug / "public" ) + logger.info("Copying public parquet files from %s to %s", omop_dir, public_output) # Copy extract files, overwriting if it exists shutil.copytree(public_input, public_output, dirs_exist_ok=True) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 595be68c0..c98a5decc 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -16,7 +16,7 @@ import logging from dataclasses import dataclass -from datetime import datetime +from datetime import date, datetime from typing import Any from jsonpickle import decode, encode @@ -30,8 +30,8 @@ class Message: mrn: str accession_number: str - study_datetime: datetime - procedure_occurrence_id: str + study_date: date + procedure_occurrence_id: int project_name: str omop_es_timestamp: datetime @@ -53,7 +53,7 @@ def serialise(self, *, deserialisable: bool = True) -> bytes: " * project_name: %s\n * omop_es_timestamp: %s", self.mrn, self.accession_number, - self.study_datetime, + self.study_date, self.procedure_occurrence_id, self.project_name, self.omop_es_timestamp, diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py index 8471cfdc4..0c9c9e65a 100644 --- a/pixl_core/tests/patient_queue/test_message.py +++ b/pixl_core/tests/patient_queue/test_message.py @@ -18,9 +18,7 @@ msg = Message( mrn="111", accession_number="123", - study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( - tzinfo=datetime.timezone.utc - ), + study_date=datetime.date.fromisoformat("2022-11-22"), procedure_occurrence_id="234", project_name="test project", omop_es_timestamp=datetime.datetime.strptime("Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p").replace( @@ -34,7 +32,7 @@ def test_serialise() -> None: msg_body = msg.serialise(deserialisable=False) assert ( msg_body == b'{"mrn": "111", "accession_number": "123", ' - b'"study_datetime": "2022-11-22T13:33:00+00:00", ' + b'"study_date": "2022-11-22", ' b'"procedure_occurrence_id": "234", ' b'"project_name": "test project", ' b'"omop_es_timestamp": "2023-12-07T14:08:00+00:00"}' diff --git a/pixl_core/tests/patient_queue/test_producer.py b/pixl_core/tests/patient_queue/test_producer.py index 1aca4cda4..4fbf01ed0 100644 --- a/pixl_core/tests/patient_queue/test_producer.py +++ b/pixl_core/tests/patient_queue/test_producer.py @@ -19,7 +19,7 @@ TEST_MESSAGE = Message( mrn="111", accession_number="123", - study_datetime="2022-11-22T13:33:00+00:00", + study_date="2022-11-22T13:33:00+00:00", procedure_occurrence_id="234", project_name="test project", omop_es_timestamp="2023-12-07T14:08:00+00:00", diff --git a/pixl_core/tests/patient_queue/test_subscriber.py b/pixl_core/tests/patient_queue/test_subscriber.py index 32910dedb..13b3b0117 100644 --- a/pixl_core/tests/patient_queue/test_subscriber.py +++ b/pixl_core/tests/patient_queue/test_subscriber.py @@ -27,7 +27,7 @@ TEST_MESSAGE = Message( mrn="111", accession_number="123", - study_datetime="2022-11-22T13:33:00+00:00", + study_date="2022-11-22T13:33:00+00:00", procedure_occurrence_id="234", project_name="test project", omop_es_timestamp="2023-12-07T14:08:00+00:00", diff --git a/pixl_ehr/src/pixl_ehr/_processing.py b/pixl_ehr/src/pixl_ehr/_processing.py index 89a9380f7..32550be6a 100644 --- a/pixl_ehr/src/pixl_ehr/_processing.py +++ b/pixl_ehr/src/pixl_ehr/_processing.py @@ -87,7 +87,7 @@ def from_message(cls, message: Message) -> "PatientEHRData": self = PatientEHRData( mrn=message.mrn, accession_number=message.accession_number, - acquisition_datetime=message.study_datetime, + acquisition_datetime=message.study_date, ) logger.debug("Created %s from message data", self) diff --git a/pixl_ehr/tests/test_processing.py b/pixl_ehr/tests/test_processing.py index d0b933884..e5a2e8d78 100644 --- a/pixl_ehr/tests/test_processing.py +++ b/pixl_ehr/tests/test_processing.py @@ -32,7 +32,7 @@ mrn = "testmrn" accession_number = "testaccessionnumber" -study_datetime_str = "01/01/1234 01:23" +study_date_str = "1234-01-01" observation_datetime = datetime.datetime.fromisoformat( "1234-01-01" ) # within hours of imaging study @@ -58,9 +58,7 @@ message = Message( mrn=mrn, accession_number=accession_number, - study_datetime=datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace( - tzinfo=datetime.timezone.utc - ), + study_date=datetime.date.fromisoformat(study_date_str), procedure_occurrence_id=procedure_occurrence_id, project_name=project_name, omop_es_timestamp=omop_es_timestamp, diff --git a/pixl_pacs/tests/test_processing.py b/pixl_pacs/tests/test_processing.py index 82084e44b..06597a957 100644 --- a/pixl_pacs/tests/test_processing.py +++ b/pixl_pacs/tests/test_processing.py @@ -33,7 +33,7 @@ message = Message( mrn=PATIENT_ID, accession_number=ACCESSION_NUMBER, - study_datetime=datetime.datetime.strptime("01/01/1234 01:23:45", "%d/%m/%Y %H:%M:%S").replace( + study_date=datetime.datetime.strptime("01/01/1234 01:23:45", "%d/%m/%Y %H:%M:%S").replace( tzinfo=datetime.timezone.utc ), procedure_occurrence_id="234", diff --git a/test/run-system-test.sh b/test/run-system-test.sh index c904a7517..94bea1d5f 100755 --- a/test/run-system-test.sh +++ b/test/run-system-test.sh @@ -38,3 +38,4 @@ sleep 65 # need to wait until the DICOM image is "stable" = 60s cd "${PACKAGE_DIR}" docker compose -f docker-compose.yml -f test/docker-compose.yml -p system-test down --volumes +rm -r exports/test-extract-uclh-omop-cdm/