From 5cf111153ce117c1e65d593bfd8f37263f6bdce5 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 12:10:16 +0000 Subject: [PATCH 01/36] Refactor message serialization and deserialization Addiing `Message` and `SerialisedMessage` classes in attempt to improve information hiding and decoupling. --- pixl_core/src/core/patient_queue/utils.py | 117 +++++++++++++--------- 1 file changed, 70 insertions(+), 47 deletions(-) diff --git a/pixl_core/src/core/patient_queue/utils.py b/pixl_core/src/core/patient_queue/utils.py index 3619b667c..406911775 100644 --- a/pixl_core/src/core/patient_queue/utils.py +++ b/pixl_core/src/core/patient_queue/utils.py @@ -21,50 +21,73 @@ logger = logging.getLogger(__name__) -def deserialise(message_body: bytes) -> dict: - """Returns the de-serialised message in JSON format.""" - logger.debug("De-serialising: %s", message_body.decode()) - data = dict(json.loads(message_body.decode())) - if "study_datetime" in data: - data["study_datetime"] = datetime.fromisoformat(data["study_datetime"]) - return data - - -def serialise( - mrn: str, - accession_number: str, - study_datetime: datetime, - procedure_occurrence_id: str, - project_name: str, - omop_es_timestamp: datetime, -) -> bytes: - """ - Returns serialised message from the given parameters. - :param mrn: patient identifier - :param accession_number: accession number - :param study_datetime: date and time of the study - :param procedure_occurrence_id: the OMOP ID of the procedure - :returns: JSON formatted message - """ - logger.debug( - "Serialising message with patient id %s, " - "accession number: %s and timestamp %s " - "procedure_occurrence_id %s, ", - "project_name %s, omop_es_timestamp %s", - mrn, - accession_number, - study_datetime, - procedure_occurrence_id, - project_name, - omop_es_timestamp, - ) - return json.dumps( - { - "mrn": mrn, - "accession_number": accession_number, - "study_datetime": study_datetime.isoformat(), - "procedure_occurrence_id": procedure_occurrence_id, - "project_name": project_name, - "omop_es_timestamp": omop_es_timestamp.isoformat(), - } - ).encode("utf-8") +class SerialisedMessage: + """Class to represent a serialised message.""" + + body: bytes + + def __init__(self, body: bytes) -> None: + """Initialise the serialised message from JSON dump.""" + self.body = body + + def deserialise(self) -> dict: + """Returns the de-serialised message in JSON format.""" + logger.debug("De-serialising: %s", self.body.decode()) + data = dict(json.loads(self.body.decode())) + if "study_datetime" in data: + data["study_datetime"] = datetime.fromisoformat(data["study_datetime"]) + if "omop_es_timestamp" in data: + data["omop_es_timestamp"] = datetime.fromisoformat(data["omop_es_timestamp"]) + + return data + + +class Message: + """Class to represent a message containing the relevant information for a study.""" + + mrn: str + accession_number: str + study_datetime: datetime + procedure_occurrence_id: str + project_name: str + omop_es_timestamp: datetime + + def __init__(self, message_fields: dict) -> None: + """Initialise the message.""" + self.mrn = message_fields["mrn"] + self.accession_number = message_fields["accession_number"] + self.study_datetime = message_fields["study_datetime"] + self.procedure_occurrence_id = message_fields["procedure_occurrence_id"] + self.project_name = message_fields["project_name"] + self.omop_es_timestamp = message_fields["omop_es_timestamp"] + + def serialise(self) -> "SerialisedMessage": + """Serialise the message into JSON format.""" + msg = ( + "Serialising message with\n" + " * patient id: %s\n" + " * accession number: %s\n" + " * timestamp: %s\n" + " * procedure_occurrence_id: %s\n", + " * project_name: %s\n * omop_es_timestamp: %s", + self.mrn, + self.accession_number, + self.study_datetime, + self.procedure_occurrence_id, + self.project_name, + self.omop_es_timestamp, + ) + logger.debug(msg) + + body = json.dumps( + { + "mrn": self.mrn, + "accession_number": self.accession_number, + "study_datetime": self.study_datetime.isoformat(), + "procedure_occurrence_id": self.procedure_occurrence_id, + "project_name": self.project_name, + "omop_es_timestamp": self.omop_es_timestamp.isoformat(), + } + ).encode("utf-8") + + return SerialisedMessage(body=body) From 2299519ae1e27650df4b0ad6f9dbfd51b228265d Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 12:12:51 +0000 Subject: [PATCH 02/36] Rename `utils.py` -> `message.py` --- cli/src/pixl_cli/main.py | 2 +- pixl_core/src/core/patient_queue/{utils.py => message.py} | 0 .../tests/patient_queue/{test_utils.py => test_message.py} | 2 +- pixl_ehr/src/pixl_ehr/_processing.py | 2 +- pixl_ehr/tests/test_processing.py | 2 +- pixl_pacs/src/pixl_pacs/_processing.py | 2 +- pixl_pacs/tests/test_processing.py | 2 +- 7 files changed, 6 insertions(+), 6 deletions(-) rename pixl_core/src/core/patient_queue/{utils.py => message.py} (100%) rename pixl_core/tests/patient_queue/{test_utils.py => test_message.py} (97%) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index e9e3bbfc2..896fecb0c 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -23,9 +23,9 @@ import pandas as pd import requests import yaml +from core.patient_queue.message import deserialise, serialise from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer -from core.patient_queue.utils import deserialise, serialise from ._logging import logger, set_log_level from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty diff --git a/pixl_core/src/core/patient_queue/utils.py b/pixl_core/src/core/patient_queue/message.py similarity index 100% rename from pixl_core/src/core/patient_queue/utils.py rename to pixl_core/src/core/patient_queue/message.py diff --git a/pixl_core/tests/patient_queue/test_utils.py b/pixl_core/tests/patient_queue/test_message.py similarity index 97% rename from pixl_core/tests/patient_queue/test_utils.py rename to pixl_core/tests/patient_queue/test_message.py index c9b66d0db..28b5292ce 100644 --- a/pixl_core/tests/patient_queue/test_utils.py +++ b/pixl_core/tests/patient_queue/test_message.py @@ -14,7 +14,7 @@ import datetime import json -from core.patient_queue.utils import deserialise, serialise +from core.patient_queue.message import deserialise, serialise def test_serialise() -> None: diff --git a/pixl_ehr/src/pixl_ehr/_processing.py b/pixl_ehr/src/pixl_ehr/_processing.py index b7d47bb75..d70976a50 100644 --- a/pixl_ehr/src/pixl_ehr/_processing.py +++ b/pixl_ehr/src/pixl_ehr/_processing.py @@ -21,7 +21,7 @@ from typing import Optional import requests -from core.patient_queue.utils import deserialise +from core.patient_queue.message import deserialise from decouple import config from pixl_ehr._databases import EMAPStar, PIXLDatabase diff --git a/pixl_ehr/tests/test_processing.py b/pixl_ehr/tests/test_processing.py index 4dd6f7b1c..50d6c50df 100644 --- a/pixl_ehr/tests/test_processing.py +++ b/pixl_ehr/tests/test_processing.py @@ -21,7 +21,7 @@ import datetime import pytest -from core.patient_queue.utils import serialise +from core.patient_queue.message import serialise from decouple import config from pixl_ehr._databases import PIXLDatabase, WriteableDatabase from pixl_ehr._processing import process_message diff --git a/pixl_pacs/src/pixl_pacs/_processing.py b/pixl_pacs/src/pixl_pacs/_processing.py index 9338d89ee..3566fbb0b 100644 --- a/pixl_pacs/src/pixl_pacs/_processing.py +++ b/pixl_pacs/src/pixl_pacs/_processing.py @@ -18,7 +18,7 @@ from datetime import datetime from time import time -from core.patient_queue.utils import deserialise +from core.patient_queue.message import deserialise from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc diff --git a/pixl_pacs/tests/test_processing.py b/pixl_pacs/tests/test_processing.py index 250d976d9..49497899f 100644 --- a/pixl_pacs/tests/test_processing.py +++ b/pixl_pacs/tests/test_processing.py @@ -19,7 +19,7 @@ import os import pytest -from core.patient_queue.utils import serialise +from core.patient_queue.message import serialise from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc from pixl_pacs._processing import ImagingStudy, process_message From 0bfcab0d05ce78d1e4dfb6832a474612c87bd8bc Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 13:49:39 +0000 Subject: [PATCH 03/36] Add `decode()` method for `SerialisedMessage` --- pixl_core/src/core/patient_queue/message.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 406911775..3d2ec3f43 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -26,14 +26,14 @@ class SerialisedMessage: body: bytes - def __init__(self, body: bytes) -> None: + def __init__(self, body: str) -> None: """Initialise the serialised message from JSON dump.""" - self.body = body + self.body = body.encode("utf-8") def deserialise(self) -> dict: """Returns the de-serialised message in JSON format.""" - logger.debug("De-serialising: %s", self.body.decode()) - data = dict(json.loads(self.body.decode())) + logger.debug("De-serialising: %s", self.decode()) + data = dict(json.loads(self.decode())) if "study_datetime" in data: data["study_datetime"] = datetime.fromisoformat(data["study_datetime"]) if "omop_es_timestamp" in data: @@ -41,6 +41,10 @@ def deserialise(self) -> dict: return data + def decode(self) -> str: + """Returns the serialised message in string format.""" + return self.body.decode() + class Message: """Class to represent a message containing the relevant information for a study.""" @@ -88,6 +92,6 @@ def serialise(self) -> "SerialisedMessage": "project_name": self.project_name, "omop_es_timestamp": self.omop_es_timestamp.isoformat(), } - ).encode("utf-8") + ) return SerialisedMessage(body=body) From b9e4a592dd6ae22c5c1d7882f90f16771e165a6b Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 13:57:28 +0000 Subject: [PATCH 04/36] Update docstring --- pixl_core/src/core/patient_queue/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 3d2ec3f43..538c74cff 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Utility functions""" +"""Classes to represent messages in the patient queue.""" import json import logging From 558f69467243df8f819c626fac145d2d453978ce Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 14:46:55 +0000 Subject: [PATCH 05/36] Use new classes in message testing --- pixl_core/tests/patient_queue/test_message.py | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py index 28b5292ce..8b9dbe5d4 100644 --- a/pixl_core/tests/patient_queue/test_message.py +++ b/pixl_core/tests/patient_queue/test_message.py @@ -14,23 +14,26 @@ import datetime import json -from core.patient_queue.message import deserialise, serialise +from core.patient_queue.message import Message, SerialisedMessage def test_serialise() -> None: """Checks that messages can be correctly serialised""" - msg_body = serialise( - mrn="111", - accession_number="123", - study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( - tzinfo=datetime.timezone.utc - ), - procedure_occurrence_id="234", - project_name="test project", - omop_es_timestamp=datetime.datetime.strptime( - "Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p" - ).replace(tzinfo=datetime.timezone.utc), + msg = Message( + { + "mrn": "111", + "accession_number": "123", + "study_datetime": datetime.datetime.strptime( + "Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p" + ).replace(tzinfo=datetime.timezone.utc), + "procedure_occurrence_id": "234", + "project_name": "test project", + "omop_es_timestamp": datetime.datetime.strptime( + "Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p" + ).replace(tzinfo=datetime.timezone.utc), + } ) + msg_body = msg.serialise() assert ( msg_body.decode() == '{"mrn": "111", "accession_number": "123", ' '"study_datetime": "2022-11-22T13:33:00+00:00", ' @@ -42,20 +45,23 @@ def test_serialise() -> None: def test_simple_deserialise() -> None: """Checks a simple JSON deserialise works""" - assert deserialise((json.dumps({"key": "value"})).encode("utf-8"))["key"] == "value" + serialised_msg = SerialisedMessage(json.dumps({"key": "value"})) + assert serialised_msg.deserialise()["key"] == "value" def test_deserialise_datetime() -> None: """Checks that datetimes can be correctly serialised""" timestamp = datetime.datetime.fromordinal(100012) - data = deserialise( - serialise( - mrn="", - accession_number="", - study_datetime=timestamp, - procedure_occurrence_id="", - project_name="", - omop_es_timestamp=datetime.datetime.now(), # noqa: DTZ005 - ) + msg = Message( + { + "mrn": "", + "accession_number": "", + "study_datetime": timestamp, + "procedure_occurrence_id": "", + "project_name": "", + "omop_es_timestamp": datetime.datetime.now(), # noqa: DTZ005 + } ) + serialised_msg = msg.serialise() + data = serialised_msg.deserialise() assert data["study_datetime"] == timestamp From fef87791f33157dc4ea2881a995be4666eeceda4 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 14:48:03 +0000 Subject: [PATCH 06/36] Refactor message processing in the CLI --- cli/src/pixl_cli/main.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 896fecb0c..663c2f5e7 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -23,7 +23,7 @@ import pandas as pd import requests import yaml -from core.patient_queue.message import deserialise, serialise +from core.patient_queue.message import Message from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer @@ -345,9 +345,6 @@ def messages_from_parquet(dir_path: Path) -> Messages: f"{expected_col_names}" ) - # First line is column names - messages = Messages() - for col in expected_col_names: if col not in list(cohort_data.columns): msg = f"csv file expected to have at least {expected_col_names} as " f"column names" @@ -367,17 +364,20 @@ def messages_from_parquet(dir_path: Path) -> Messages: project_name = logs["settings"]["cdm_source_name"] omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"]) + messages = Messages() + for _, row in cohort_data.iterrows(): - messages.append( - serialise( - mrn=row[mrn_col_name], - accession_number=row[acc_num_col_name], - study_datetime=row[dt_col_name], - procedure_occurrence_id=row[procedure_occurrence_id], - project_name=project_name, - omop_es_timestamp=omop_es_timestamp, - ) - ) + # Create new dict to initialise message + message_fields = { + "mrn": row[mrn_col_name], + "accession_number": row[acc_num_col_name], + "study_datetime": row[dt_col_name], + "procedure_occurrence_id": row[procedure_occurrence_id], + "project_name": project_name, + "omop_es_timestamp": omop_es_timestamp, + } + message = Message(message_fields) + messages.append(message.serialise()) if len(messages) == 0: msg = f"Failed to find any messages in {dir_path}" @@ -448,9 +448,9 @@ def api_config_for_queue(queue_name: str) -> APIConfig: return APIConfig(config[config_key]) -def study_date_from_serialised(message: bytes) -> datetime.datetime: +def study_date_from_serialised(message: Message) -> datetime.datetime: """Get the study date from a serialised message as a datetime""" - result = deserialise(message)["study_datetime"] + result = message.deserialise()["study_datetime"] if not isinstance(result, datetime.datetime): msg = "Expected study date to be a datetime. Got %s" raise TypeError(msg, type(result)) From e23929971e353f5d4895e723b51030927ece12c3 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 14:48:38 +0000 Subject: [PATCH 07/36] Refactor `process_message` to use `SerialisedMessage` class in EHR API --- pixl_ehr/src/pixl_ehr/_processing.py | 12 ++++++------ pixl_ehr/tests/test_processing.py | 25 ++++++++++++++----------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/pixl_ehr/src/pixl_ehr/_processing.py b/pixl_ehr/src/pixl_ehr/_processing.py index d70976a50..88cc17d02 100644 --- a/pixl_ehr/src/pixl_ehr/_processing.py +++ b/pixl_ehr/src/pixl_ehr/_processing.py @@ -21,7 +21,7 @@ from typing import Optional import requests -from core.patient_queue.message import deserialise +from core.patient_queue.message import SerialisedMessage from decouple import config from pixl_ehr._databases import EMAPStar, PIXLDatabase @@ -35,10 +35,10 @@ _this_dir = Path(Path(__file__).parent) -async def process_message(message_body: bytes) -> None: - logger.info("Processing: %s", message_body.decode()) +async def process_message(serialised_message: SerialisedMessage) -> None: + logger.info("Processing: %s", serialised_message.decode()) - raw_data = PatientEHRData.from_message(message_body) + raw_data = PatientEHRData.from_message(serialised_message) pixl_db = PIXLDatabase() if pixl_db.contains(raw_data): @@ -79,12 +79,12 @@ class PatientEHRData: report_text: Optional[str] = None @classmethod - def from_message(cls, message_body: bytes) -> "PatientEHRData": + def from_message(cls, serialised_message: SerialisedMessage) -> "PatientEHRData": """ Create a minimal set of patient EHR data required to start queries from a queue message """ - message_data = deserialise(message_body) + message_data = serialised_message.deserialise() self = PatientEHRData( mrn=message_data["mrn"], accession_number=message_data["accession_number"], diff --git a/pixl_ehr/tests/test_processing.py b/pixl_ehr/tests/test_processing.py index 50d6c50df..7a5e28888 100644 --- a/pixl_ehr/tests/test_processing.py +++ b/pixl_ehr/tests/test_processing.py @@ -21,7 +21,7 @@ import datetime import pytest -from core.patient_queue.message import serialise +from core.patient_queue.message import Message from decouple import config from pixl_ehr._databases import PIXLDatabase, WriteableDatabase from pixl_ehr._processing import process_message @@ -55,16 +55,19 @@ weight_vot_id, height_vot_id, gcs_vot_id = 2222222, 3333333, 4444444 ls_id, lo_id, lr_id, ltd_id = 5555555, 6666666, 7777777, 8888888 -message_body = serialise( - mrn=mrn, - accession_number=accession_number, - study_datetime=datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace( - tzinfo=datetime.timezone.utc - ), - procedure_occurrence_id=procedure_occurrence_id, - project_name=project_name, - omop_es_timestamp=omop_es_timestamp, +message = Message( + { + "mrn": mrn, + "accession_number": accession_number, + "study_datetime": datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace( + tzinfo=datetime.timezone.utc + ), + "procedure_occurrence_id": procedure_occurrence_id, + "project_name": project_name, + "omop_es_timestamp": omop_es_timestamp, + } ) +serialised_message = message.serialise() class WritableEMAPStar(WriteableDatabase): @@ -163,7 +166,7 @@ def insert_data_into_emap_star_schema() -> None: @pytest.mark.asyncio() async def test_message_processing() -> None: insert_data_into_emap_star_schema() - await process_message(message_body) + await process_message(serialised_message) pixl_db = QueryablePIXLDB() row = pixl_db.execute_query_string("select * from emap_data.ehr_raw where mrn = %s", [mrn]) From 596e85c07a9d680f7965881c18e7e7f221d83120 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 15:05:13 +0000 Subject: [PATCH 08/36] Refactor `process_message` to use `SerialisedMessage` class in imaging API --- pixl_pacs/src/pixl_pacs/_processing.py | 27 +++++++++++------------- pixl_pacs/tests/test_processing.py | 29 ++++++++++++++------------ 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pixl_pacs/src/pixl_pacs/_processing.py b/pixl_pacs/src/pixl_pacs/_processing.py index 3566fbb0b..eace4b370 100644 --- a/pixl_pacs/src/pixl_pacs/_processing.py +++ b/pixl_pacs/src/pixl_pacs/_processing.py @@ -15,10 +15,9 @@ import os from asyncio import sleep from dataclasses import dataclass -from datetime import datetime from time import time -from core.patient_queue.message import deserialise +from core.patient_queue.message import Message, SerialisedMessage from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc @@ -27,10 +26,10 @@ logger.setLevel(os.environ.get("LOG_LEVEL", "WARNING")) -async def process_message(message_body: bytes) -> None: - logger.info("Processing: %s", message_body.decode()) +async def process_message(serialised_message: SerialisedMessage) -> None: + logger.info("Processing: %s", serialised_message.decode()) - study = ImagingStudy.from_message(message_body) + study = ImagingStudy.from_message(serialised_message) orthanc_raw = PIXLRawOrthanc() if study.exists_in(orthanc_raw): @@ -49,7 +48,7 @@ async def process_message(message_body: bytes) -> None: while job_state != "Success": if (time() - start_time) > config("PIXL_DICOM_TRANSFER_TIMEOUT", cast=float): msg = ( - f"Failed to transfer {message_body.decode()} within " + f"Failed to transfer {serialised_message.decode()} within " f"{config('PIXL_DICOM_TRANSFER_TIMEOUT')} seconds" ) raise TimeoutError(msg) @@ -64,22 +63,20 @@ async def process_message(message_body: bytes) -> None: class ImagingStudy: """Dataclass for EHR unique to a patient and xray study""" - mrn: str - accession_number: str - study_datetime: datetime - procedure_occurrence_id: str - project_name: str - omop_es_timestamp: datetime + message: Message @classmethod - def from_message(cls, message_body: bytes) -> "ImagingStudy": - return ImagingStudy(**deserialise(message_body)) + def from_message(cls, serialised_message: SerialisedMessage) -> "ImagingStudy": + return ImagingStudy(serialised_message.deserialise()) @property def orthanc_query_dict(self) -> dict: return { "Level": "Study", - "Query": {"PatientID": self.mrn, "AccessionNumber": self.accession_number}, + "Query": { + "PatientID": self.message.mrn, + "AccessionNumber": self.message.accession_number, + }, } def exists_in(self, node: Orthanc) -> bool: diff --git a/pixl_pacs/tests/test_processing.py b/pixl_pacs/tests/test_processing.py index 49497899f..e4852363e 100644 --- a/pixl_pacs/tests/test_processing.py +++ b/pixl_pacs/tests/test_processing.py @@ -19,7 +19,7 @@ import os import pytest -from core.patient_queue.message import serialise +from core.patient_queue import Message from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc from pixl_pacs._processing import ImagingStudy, process_message @@ -30,16 +30,19 @@ ACCESSION_NUMBER = "abc" PATIENT_ID = "a_patient" -message_body = serialise( - mrn=PATIENT_ID, - accession_number=ACCESSION_NUMBER, - study_datetime=datetime.datetime.strptime("01/01/1234 01:23:45", "%d/%m/%Y %H:%M:%S").replace( - tzinfo=datetime.timezone.utc - ), - procedure_occurrence_id="234", - project_name="test project", - omop_es_timestamp=datetime.datetime.fromisoformat("1234-01-01 00:00:00"), +message = Message( + { + "mrn": PATIENT_ID, + "accession_number": ACCESSION_NUMBER, + "study_datetime": datetime.datetime.strptime( + "01/01/1234 01:23:45", "%d/%m/%Y %H:%M:%S" + ).replace(tzinfo=datetime.timezone.utc), + "procedure_occurrence_id": "234", + "project_name": "test project", + "omop_es_timestamp": datetime.datetime.fromisoformat("1234-01-01 00:00:00"), + } ) +serialised_message = message.serialise() class WritableOrthanc(Orthanc): @@ -73,15 +76,15 @@ def add_image_to_fake_vna(image_filename: str = "test.dcm") -> None: @pytest.mark.asyncio() async def test_image_processing() -> None: add_image_to_fake_vna() - study = ImagingStudy.from_message(message_body) + study = ImagingStudy.from_message(serialised_message) orthanc_raw = PIXLRawOrthanc() assert not study.exists_in(orthanc_raw) - await process_message(message_body=message_body) + await process_message(serialised_message) assert study.exists_in(orthanc_raw) # TODO: check time last updated after processing again # noqa: FIX002 # is not incremented # https://github.com/UCLH-Foundry/PIXL/issues/156 - await process_message(message_body=message_body) + await process_message(serialised_message) assert study.exists_in(orthanc_raw) From 7f160a25fd311298591bff36bd69f6af2b6dae18 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 15:18:36 +0000 Subject: [PATCH 09/36] Fix `ImagingStudy` initalisation in `ImagingStudy.from_message()` --- pixl_pacs/src/pixl_pacs/_processing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pixl_pacs/src/pixl_pacs/_processing.py b/pixl_pacs/src/pixl_pacs/_processing.py index eace4b370..1062ebab0 100644 --- a/pixl_pacs/src/pixl_pacs/_processing.py +++ b/pixl_pacs/src/pixl_pacs/_processing.py @@ -67,7 +67,7 @@ class ImagingStudy: @classmethod def from_message(cls, serialised_message: SerialisedMessage) -> "ImagingStudy": - return ImagingStudy(serialised_message.deserialise()) + return ImagingStudy(message=Message(serialised_message.deserialise())) @property def orthanc_query_dict(self) -> dict: From 31a6f29d9c70e43e2391d0b8c4d0e0a98179052d Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 15:24:40 +0000 Subject: [PATCH 10/36] Fix imports --- pixl_ehr/src/pixl_ehr/main.py | 2 +- pixl_pacs/tests/test_processing.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pixl_ehr/src/pixl_ehr/main.py b/pixl_ehr/src/pixl_ehr/main.py index bd1b781d1..38f5ebf02 100644 --- a/pixl_ehr/src/pixl_ehr/main.py +++ b/pixl_ehr/src/pixl_ehr/main.py @@ -21,7 +21,7 @@ from azure.identity import EnvironmentCredential from azure.storage.blob import BlobServiceClient -from core.patient_queue import PixlConsumer +from core.patient_queue.message import PixlConsumer from core.router import router, state from decouple import config from fastapi import FastAPI diff --git a/pixl_pacs/tests/test_processing.py b/pixl_pacs/tests/test_processing.py index e4852363e..6823c094e 100644 --- a/pixl_pacs/tests/test_processing.py +++ b/pixl_pacs/tests/test_processing.py @@ -19,7 +19,7 @@ import os import pytest -from core.patient_queue import Message +from core.patient_queue.message import Message from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc from pixl_pacs._processing import ImagingStudy, process_message From 9a8f5a928d57429aa9436d9504086c65642ac497 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 16:53:52 +0000 Subject: [PATCH 11/36] Fix test: access serialised message bodies --- cli/tests/test_messages_from_parquet.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index ec081ac4f..13f4f4aec 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -24,7 +24,8 @@ def test_messages_from_parquet(resources: Path) -> None: The test data doesn't have any "difficult" cases in it, eg. people without procedures. """ omop_parquet_dir = resources / "omop" - messages = messages_from_parquet(omop_parquet_dir) + serialised_messages = messages_from_parquet(omop_parquet_dir) + message_bodies = [msg.body() for msg in serialised_messages] expected_messages = [ b'{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' b'"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' @@ -40,4 +41,4 @@ def test_messages_from_parquet(resources: Path) -> None: b'"omop_es_timestamp": "2023-12-07T14:08:58"}', ] - assert messages == expected_messages + assert message_bodies == expected_messages From aa7831841e7d9d3b726fda1881998c7de4f6a628 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 17:34:36 +0000 Subject: [PATCH 12/36] Turn `Message` into a `dataclass` --- cli/src/pixl_cli/main.py | 19 +++++----- pixl_core/src/core/patient_queue/message.py | 11 ++---- pixl_core/tests/patient_queue/test_message.py | 36 +++++++++---------- pixl_ehr/tests/test_processing.py | 18 +++++----- pixl_pacs/src/pixl_pacs/_processing.py | 2 +- pixl_pacs/tests/test_processing.py | 18 +++++----- 6 files changed, 44 insertions(+), 60 deletions(-) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 663c2f5e7..672bd1990 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -273,7 +273,7 @@ def state_filepath_for_queue(queue_name: str) -> Path: return Path(f"{queue_name.replace('/', '_')}.state") -class Messages(list): +class Messages(list[bytes]): """ Class to represent messages @@ -368,15 +368,14 @@ def messages_from_parquet(dir_path: Path) -> Messages: for _, row in cohort_data.iterrows(): # Create new dict to initialise message - message_fields = { - "mrn": row[mrn_col_name], - "accession_number": row[acc_num_col_name], - "study_datetime": row[dt_col_name], - "procedure_occurrence_id": row[procedure_occurrence_id], - "project_name": project_name, - "omop_es_timestamp": omop_es_timestamp, - } - message = Message(message_fields) + message = Message( + mrn=row[mrn_col_name], + accession_number=row[acc_num_col_name], + study_datetime=row[dt_col_name], + procedure_occurrence_id=row[procedure_occurrence_id], + project_name=project_name, + omop_es_timestamp=omop_es_timestamp, + ) messages.append(message.serialise()) if len(messages) == 0: diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 538c74cff..6b59db22f 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -16,6 +16,7 @@ import json import logging +from dataclasses import dataclass from datetime import datetime logger = logging.getLogger(__name__) @@ -46,6 +47,7 @@ def decode(self) -> str: return self.body.decode() +@dataclass class Message: """Class to represent a message containing the relevant information for a study.""" @@ -56,15 +58,6 @@ class Message: project_name: str omop_es_timestamp: datetime - def __init__(self, message_fields: dict) -> None: - """Initialise the message.""" - self.mrn = message_fields["mrn"] - self.accession_number = message_fields["accession_number"] - self.study_datetime = message_fields["study_datetime"] - self.procedure_occurrence_id = message_fields["procedure_occurrence_id"] - self.project_name = message_fields["project_name"] - self.omop_es_timestamp = message_fields["omop_es_timestamp"] - def serialise(self) -> "SerialisedMessage": """Serialise the message into JSON format.""" msg = ( diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py index 8b9dbe5d4..ffdaa41d2 100644 --- a/pixl_core/tests/patient_queue/test_message.py +++ b/pixl_core/tests/patient_queue/test_message.py @@ -20,18 +20,16 @@ def test_serialise() -> None: """Checks that messages can be correctly serialised""" msg = Message( - { - "mrn": "111", - "accession_number": "123", - "study_datetime": datetime.datetime.strptime( - "Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p" - ).replace(tzinfo=datetime.timezone.utc), - "procedure_occurrence_id": "234", - "project_name": "test project", - "omop_es_timestamp": datetime.datetime.strptime( - "Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p" - ).replace(tzinfo=datetime.timezone.utc), - } + mrn="111", + accession_number="123", + study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( + tzinfo=datetime.timezone.utc + ), + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp=datetime.datetime.strptime( + "Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p" + ).replace(tzinfo=datetime.timezone.utc), ) msg_body = msg.serialise() assert ( @@ -53,14 +51,12 @@ def test_deserialise_datetime() -> None: """Checks that datetimes can be correctly serialised""" timestamp = datetime.datetime.fromordinal(100012) msg = Message( - { - "mrn": "", - "accession_number": "", - "study_datetime": timestamp, - "procedure_occurrence_id": "", - "project_name": "", - "omop_es_timestamp": datetime.datetime.now(), # noqa: DTZ005 - } + mrn="", + accession_number="", + study_datetime=timestamp, + procedure_occurrence_id="", + project_name="", + omop_es_timestamp=datetime.datetime.now(), # noqa: DTZ005 ) serialised_msg = msg.serialise() data = serialised_msg.deserialise() diff --git a/pixl_ehr/tests/test_processing.py b/pixl_ehr/tests/test_processing.py index 7a5e28888..9bb75d5d4 100644 --- a/pixl_ehr/tests/test_processing.py +++ b/pixl_ehr/tests/test_processing.py @@ -56,16 +56,14 @@ ls_id, lo_id, lr_id, ltd_id = 5555555, 6666666, 7777777, 8888888 message = Message( - { - "mrn": mrn, - "accession_number": accession_number, - "study_datetime": datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace( - tzinfo=datetime.timezone.utc - ), - "procedure_occurrence_id": procedure_occurrence_id, - "project_name": project_name, - "omop_es_timestamp": omop_es_timestamp, - } + mrn=mrn, + accession_number=accession_number, + study_datetime=datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace( + tzinfo=datetime.timezone.utc + ), + procedure_occurrence_id=procedure_occurrence_id, + project_name=project_name, + omop_es_timestamp=omop_es_timestamp, ) serialised_message = message.serialise() diff --git a/pixl_pacs/src/pixl_pacs/_processing.py b/pixl_pacs/src/pixl_pacs/_processing.py index 1062ebab0..bb28830a2 100644 --- a/pixl_pacs/src/pixl_pacs/_processing.py +++ b/pixl_pacs/src/pixl_pacs/_processing.py @@ -67,7 +67,7 @@ class ImagingStudy: @classmethod def from_message(cls, serialised_message: SerialisedMessage) -> "ImagingStudy": - return ImagingStudy(message=Message(serialised_message.deserialise())) + return ImagingStudy(message=Message(**serialised_message.deserialise())) @property def orthanc_query_dict(self) -> dict: diff --git a/pixl_pacs/tests/test_processing.py b/pixl_pacs/tests/test_processing.py index 6823c094e..d263783f4 100644 --- a/pixl_pacs/tests/test_processing.py +++ b/pixl_pacs/tests/test_processing.py @@ -31,16 +31,14 @@ ACCESSION_NUMBER = "abc" PATIENT_ID = "a_patient" message = Message( - { - "mrn": PATIENT_ID, - "accession_number": ACCESSION_NUMBER, - "study_datetime": datetime.datetime.strptime( - "01/01/1234 01:23:45", "%d/%m/%Y %H:%M:%S" - ).replace(tzinfo=datetime.timezone.utc), - "procedure_occurrence_id": "234", - "project_name": "test project", - "omop_es_timestamp": datetime.datetime.fromisoformat("1234-01-01 00:00:00"), - } + mrn=PATIENT_ID, + accession_number=ACCESSION_NUMBER, + study_datetime=datetime.datetime.strptime("01/01/1234 01:23:45", "%d/%m/%Y %H:%M:%S").replace( + tzinfo=datetime.timezone.utc + ), + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp=datetime.datetime.fromisoformat("1234-01-01 00:00:00"), ) serialised_message = message.serialise() From cc5b12c14d6a8e9d18113944d375d5d37982ebea Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Tue, 19 Dec 2023 18:20:00 +0000 Subject: [PATCH 13/36] Fix failing tests --- cli/src/pixl_cli/main.py | 10 ++++++---- cli/tests/test_messages_from_parquet.py | 3 +-- pixl_ehr/src/pixl_ehr/main.py | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 672bd1990..bbc11f4c4 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -23,7 +23,7 @@ import pandas as pd import requests import yaml -from core.patient_queue.message import Message +from core.patient_queue.message import Message, SerialisedMessage from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer @@ -376,7 +376,7 @@ def messages_from_parquet(dir_path: Path) -> Messages: project_name=project_name, omop_es_timestamp=omop_es_timestamp, ) - messages.append(message.serialise()) + messages.append(message.serialise().body) if len(messages) == 0: msg = f"Failed to find any messages in {dir_path}" @@ -447,9 +447,11 @@ def api_config_for_queue(queue_name: str) -> APIConfig: return APIConfig(config[config_key]) -def study_date_from_serialised(message: Message) -> datetime.datetime: +def study_date_from_serialised(message: bytes) -> datetime.datetime: """Get the study date from a serialised message as a datetime""" - result = message.deserialise()["study_datetime"] + # FIXME: turn study_datetime into a @property of Message + # Hack to get the study date from a serialised message and get the tests passing + result = SerialisedMessage(message.decode()).deserialise()["study_datetime"] if not isinstance(result, datetime.datetime): msg = "Expected study date to be a datetime. Got %s" raise TypeError(msg, type(result)) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 13f4f4aec..f317b3acf 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -24,8 +24,7 @@ def test_messages_from_parquet(resources: Path) -> None: The test data doesn't have any "difficult" cases in it, eg. people without procedures. """ omop_parquet_dir = resources / "omop" - serialised_messages = messages_from_parquet(omop_parquet_dir) - message_bodies = [msg.body() for msg in serialised_messages] + message_bodies = messages_from_parquet(omop_parquet_dir) expected_messages = [ b'{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' b'"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' diff --git a/pixl_ehr/src/pixl_ehr/main.py b/pixl_ehr/src/pixl_ehr/main.py index 38f5ebf02..bd1b781d1 100644 --- a/pixl_ehr/src/pixl_ehr/main.py +++ b/pixl_ehr/src/pixl_ehr/main.py @@ -21,7 +21,7 @@ from azure.identity import EnvironmentCredential from azure.storage.blob import BlobServiceClient -from core.patient_queue.message import PixlConsumer +from core.patient_queue import PixlConsumer from core.router import router, state from decouple import config from fastapi import FastAPI From b8dfad05d1b2087cb90b66e029bf9c7c580e9e68 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 11:12:17 +0000 Subject: [PATCH 14/36] Use `jsonpickle` for (de)serializing messages This also removes the need for the `SerialisedMessage` class --- pixl_core/pyproject.toml | 3 +- pixl_core/src/core/patient_queue/message.py | 62 +++++++------------ pixl_core/tests/patient_queue/test_message.py | 54 +++++++--------- 3 files changed, 48 insertions(+), 71 deletions(-) diff --git a/pixl_core/pyproject.toml b/pixl_core/pyproject.toml index 8fdd72727..056307e6b 100644 --- a/pixl_core/pyproject.toml +++ b/pixl_core/pyproject.toml @@ -18,7 +18,8 @@ dependencies = [ "pika==1.3.1", "aio_pika==8.2.4", "environs==9.5.0", - "requests==2.31.0" + "requests==2.31.0", + "jsonpickle==3.0.2" ] [project.optional-dependencies] diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 6b59db22f..13cac0633 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -14,37 +14,14 @@ """Classes to represent messages in the patient queue.""" -import json import logging from dataclasses import dataclass from datetime import datetime +from typing import Any -logger = logging.getLogger(__name__) - - -class SerialisedMessage: - """Class to represent a serialised message.""" - - body: bytes - - def __init__(self, body: str) -> None: - """Initialise the serialised message from JSON dump.""" - self.body = body.encode("utf-8") - - def deserialise(self) -> dict: - """Returns the de-serialised message in JSON format.""" - logger.debug("De-serialising: %s", self.decode()) - data = dict(json.loads(self.decode())) - if "study_datetime" in data: - data["study_datetime"] = datetime.fromisoformat(data["study_datetime"]) - if "omop_es_timestamp" in data: - data["omop_es_timestamp"] = datetime.fromisoformat(data["omop_es_timestamp"]) +from jsonpickle import decode, encode - return data - - def decode(self) -> str: - """Returns the serialised message in string format.""" - return self.body.decode() +logger = logging.getLogger(__name__) @dataclass @@ -58,8 +35,15 @@ class Message: project_name: str omop_es_timestamp: datetime - def serialise(self) -> "SerialisedMessage": - """Serialise the message into JSON format.""" + def serialise(self, deserialisable: bool = True) -> Any: # noqa: FBT001, FBT002 + """ + Serialise the message into a JSON string. + + :param deserialisable: If True, the serialised message will be deserialisable, by setting + the unpicklable flag to False in jsonpickle.encode(), meaning that the original Message + object can be recovered by `deserialise()`. If False, calling `deserialise()` on the + serialised message will return a dictionary. + """ msg = ( "Serialising message with\n" " * patient id: %s\n" @@ -76,15 +60,15 @@ def serialise(self) -> "SerialisedMessage": ) logger.debug(msg) - body = json.dumps( - { - "mrn": self.mrn, - "accession_number": self.accession_number, - "study_datetime": self.study_datetime.isoformat(), - "procedure_occurrence_id": self.procedure_occurrence_id, - "project_name": self.project_name, - "omop_es_timestamp": self.omop_es_timestamp.isoformat(), - } - ) + return encode(self, unpicklable=deserialisable) + + +def deserialise(serialised_msg: str) -> Any: + """ + Deserialise a message from a JSON string. + If the message was serialised with `deserialisable=True`, the original Message object will be + returned. Otherwise, a dictionary will be returned. - return SerialisedMessage(body=body) + :param serialised_msg: The serialised message. + """ + return decode(serialised_msg) # noqa: S301 diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py index ffdaa41d2..5564ab59a 100644 --- a/pixl_core/tests/patient_queue/test_message.py +++ b/pixl_core/tests/patient_queue/test_message.py @@ -12,28 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. import datetime -import json -from core.patient_queue.message import Message, SerialisedMessage +from core.patient_queue.message import Message, deserialise + +msg = Message( + mrn="111", + accession_number="123", + study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( + tzinfo=datetime.timezone.utc + ), + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp=datetime.datetime.strptime("Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p").replace( + tzinfo=datetime.timezone.utc + ), +) def test_serialise() -> None: """Checks that messages can be correctly serialised""" - msg = Message( - mrn="111", - accession_number="123", - study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( - tzinfo=datetime.timezone.utc - ), - procedure_occurrence_id="234", - project_name="test project", - omop_es_timestamp=datetime.datetime.strptime( - "Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p" - ).replace(tzinfo=datetime.timezone.utc), - ) - msg_body = msg.serialise() + msg_body = msg.serialise(deserialisable=False) assert ( - msg_body.decode() == '{"mrn": "111", "accession_number": "123", ' + msg_body == '{"mrn": "111", "accession_number": "123", ' '"study_datetime": "2022-11-22T13:33:00+00:00", ' '"procedure_occurrence_id": "234", ' '"project_name": "test project", ' @@ -41,23 +41,15 @@ def test_serialise() -> None: ) -def test_simple_deserialise() -> None: - """Checks a simple JSON deserialise works""" - serialised_msg = SerialisedMessage(json.dumps({"key": "value"})) - assert serialised_msg.deserialise()["key"] == "value" +def test_deserialise() -> None: + """Checks if deserialised messages are the same as the original""" + serialised_msg = msg.serialise() + assert deserialise(serialised_msg) == msg def test_deserialise_datetime() -> None: """Checks that datetimes can be correctly serialised""" - timestamp = datetime.datetime.fromordinal(100012) - msg = Message( - mrn="", - accession_number="", - study_datetime=timestamp, - procedure_occurrence_id="", - project_name="", - omop_es_timestamp=datetime.datetime.now(), # noqa: DTZ005 - ) - serialised_msg = msg.serialise() - data = serialised_msg.deserialise() + timestamp = datetime.datetime.fromisoformat("2022-11-22T13:33:00+00:00") + msg.study_datetime = timestamp + data = deserialise(msg.serialise(deserialisable=False)) assert data["study_datetime"] == timestamp From f1b848d528ffa26b799652716058468d85bb7a53 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 11:24:14 +0000 Subject: [PATCH 15/36] Fix `test_deserialise_datetime()` so it uses the `Message` class to assert the `study_datetime` --- pixl_core/tests/patient_queue/test_message.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py index 5564ab59a..d05cda905 100644 --- a/pixl_core/tests/patient_queue/test_message.py +++ b/pixl_core/tests/patient_queue/test_message.py @@ -48,8 +48,8 @@ def test_deserialise() -> None: def test_deserialise_datetime() -> None: - """Checks that datetimes can be correctly serialised""" + """Checks that datetimes can be correctly deserialised""" timestamp = datetime.datetime.fromisoformat("2022-11-22T13:33:00+00:00") msg.study_datetime = timestamp - data = deserialise(msg.serialise(deserialisable=False)) - assert data["study_datetime"] == timestamp + data = deserialise(msg.serialise()) + assert data.study_datetime == timestamp From 87191539881e9a2a3ca2bab99c771ef31d2983e3 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 11:51:50 +0000 Subject: [PATCH 16/36] Add `study_datetime` property for `Message` --- pixl_core/src/core/patient_queue/message.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 13cac0633..5d14092d3 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -30,7 +30,7 @@ class Message: mrn: str accession_number: str - study_datetime: datetime + _study_datetime: datetime procedure_occurrence_id: str project_name: str omop_es_timestamp: datetime @@ -62,6 +62,11 @@ def serialise(self, deserialisable: bool = True) -> Any: # noqa: FBT001, FBT002 return encode(self, unpicklable=deserialisable) + @property + def study_datetime(self) -> datetime: + """Return the study datetime as a datetime object.""" + return self._study_datetime + def deserialise(serialised_msg: str) -> Any: """ From b08a9c4176044a328195236935bf40995059fde5 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 12:04:03 +0000 Subject: [PATCH 17/36] No need to test deserialising individual fields, already covered by `test_deserialise()` which deserialises the entire object --- pixl_core/tests/patient_queue/test_message.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py index d05cda905..2ed745956 100644 --- a/pixl_core/tests/patient_queue/test_message.py +++ b/pixl_core/tests/patient_queue/test_message.py @@ -45,11 +45,3 @@ def test_deserialise() -> None: """Checks if deserialised messages are the same as the original""" serialised_msg = msg.serialise() assert deserialise(serialised_msg) == msg - - -def test_deserialise_datetime() -> None: - """Checks that datetimes can be correctly deserialised""" - timestamp = datetime.datetime.fromisoformat("2022-11-22T13:33:00+00:00") - msg.study_datetime = timestamp - data = deserialise(msg.serialise()) - assert data.study_datetime == timestamp From 3e7c5537db6c87386426fde0082d28fe82522111 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 12:25:12 +0000 Subject: [PATCH 18/36] Remove `study_date_from_serialised()`, use the class attribute `study_datetime` instead --- cli/src/pixl_cli/main.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index bbc11f4c4..7a042baab 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -89,7 +89,7 @@ def populate(queues: str, *, restart: bool, parquet_dir: Path) -> None: messages = messages_from_parquet(parquet_dir) remove_file_if_it_exists(state_filepath) # will be stale - producer.publish(sorted(messages, key=study_date_from_serialised)) + producer.publish(sorted(messages, key=attrgetter("study_datetime"))) @cli.command() @@ -445,14 +445,3 @@ def api_config_for_queue(queue_name: str) -> APIConfig: raise ValueError(msg) return APIConfig(config[config_key]) - - -def study_date_from_serialised(message: bytes) -> datetime.datetime: - """Get the study date from a serialised message as a datetime""" - # FIXME: turn study_datetime into a @property of Message - # Hack to get the study date from a serialised message and get the tests passing - result = SerialisedMessage(message.decode()).deserialise()["study_datetime"] - if not isinstance(result, datetime.datetime): - msg = "Expected study date to be a datetime. Got %s" - raise TypeError(msg, type(result)) - return result From e8c0cc927f9acccfec5a31911c5df7b03b1d87a1 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:02:23 +0000 Subject: [PATCH 19/36] Revert "Add `study_datetime` property for `Message`" This reverts commit 87191539881e9a2a3ca2bab99c771ef31d2983e3. --- pixl_core/src/core/patient_queue/message.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 5d14092d3..13cac0633 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -30,7 +30,7 @@ class Message: mrn: str accession_number: str - _study_datetime: datetime + study_datetime: datetime procedure_occurrence_id: str project_name: str omop_es_timestamp: datetime @@ -62,11 +62,6 @@ def serialise(self, deserialisable: bool = True) -> Any: # noqa: FBT001, FBT002 return encode(self, unpicklable=deserialisable) - @property - def study_datetime(self) -> datetime: - """Return the study datetime as a datetime object.""" - return self._study_datetime - def deserialise(serialised_msg: str) -> Any: """ From fb44bf6197e483b1b84c8b516948a34ad295856a Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:03:46 +0000 Subject: [PATCH 20/36] Remove `Messages` class, use `list[Message]` instead --- cli/src/pixl_cli/main.py | 52 +++++++++++++++------------------------- 1 file changed, 19 insertions(+), 33 deletions(-) diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index 7a042baab..fdaf3d71c 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -16,6 +16,7 @@ import datetime import json import os +from operator import attrgetter from pathlib import Path from typing import Any, Optional @@ -23,7 +24,7 @@ import pandas as pd import requests import yaml -from core.patient_queue.message import Message, SerialisedMessage +from core.patient_queue.message import Message, deserialise from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer @@ -84,7 +85,7 @@ def populate(queues: str, *, restart: bool, parquet_dir: Path) -> None: if state_filepath.exists() and restart: logger.info(f"Extracting messages from state: {state_filepath}") inform_user_that_queue_will_be_populated_from(state_filepath) - messages = Messages.from_state_file(state_filepath) + messages = messages_from_state_file(state_filepath) elif parquet_dir is not None: messages = messages_from_parquet(parquet_dir) @@ -273,41 +274,26 @@ def state_filepath_for_queue(queue_name: str) -> Path: return Path(f"{queue_name.replace('/', '_')}.state") -class Messages(list[bytes]): +def messages_from_state_file(filepath: Path) -> list[Message]: """ - Class to represent messages + Return messages from a state file path - Methods - ------- - from_state_file(cls, filepath) - Return messages from a state file path + :param filepath: Path for state file to be read + :return: A list of Message objects containing all the messages from the state file """ + logger.info(f"Creating messages from {filepath}") + if not filepath.exists(): + raise FileNotFoundError + if filepath.suffix != ".state": + msg = f"Invalid file suffix for {filepath}. Expected .state" + raise ValueError(msg) - @classmethod - def from_state_file(cls, filepath: Path) -> "Messages": - """ - Return messages from a state file path - - :param filepath: Path for state file to be read - :return: A Messages object containing all the messages from the state file - """ - logger.info(f"Creating messages from {filepath}") - if not filepath.exists(): - raise FileNotFoundError - if filepath.suffix != ".state": - msg = f"Invalid file suffix for {filepath}. Expected .state" - raise ValueError(msg) - - return cls( - [ - line.encode("utf-8") - for line in Path.open(filepath).readlines() - if string_is_non_empty(line) - ] - ) + return [ + deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line) + ] -def messages_from_parquet(dir_path: Path) -> Messages: +def messages_from_parquet(dir_path: Path) -> list[Message]: """ Reads patient information from parquet files within directory structure and transforms that into messages. @@ -364,7 +350,7 @@ def messages_from_parquet(dir_path: Path) -> Messages: project_name = logs["settings"]["cdm_source_name"] omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"]) - messages = Messages() + messages = [] for _, row in cohort_data.iterrows(): # Create new dict to initialise message @@ -376,7 +362,7 @@ def messages_from_parquet(dir_path: Path) -> Messages: project_name=project_name, omop_es_timestamp=omop_es_timestamp, ) - messages.append(message.serialise().body) + messages.append(message) if len(messages) == 0: msg = f"Failed to find any messages in {dir_path}" From 6af792e54c068902ef10a95545c57fe24723b889 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:04:52 +0000 Subject: [PATCH 21/36] Add type checking for messages parsed from parquet input --- cli/tests/test_messages_from_parquet.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index f317b3acf..87fc226f8 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -15,6 +15,7 @@ from pathlib import Path +from core.patient_queue.message import Message from pixl_cli.main import messages_from_parquet @@ -24,7 +25,10 @@ def test_messages_from_parquet(resources: Path) -> None: The test data doesn't have any "difficult" cases in it, eg. people without procedures. """ omop_parquet_dir = resources / "omop" - message_bodies = messages_from_parquet(omop_parquet_dir) + messages = messages_from_parquet(omop_parquet_dir) + assert all(isinstance(msg, Message) for msg in messages) + + message_bodies = [msg.serialise(deserialisable=False) for msg in messages] expected_messages = [ b'{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' b'"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' From 0e4fce42dbef912f45e9f1b0105bb44cb771c95e Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:05:37 +0000 Subject: [PATCH 22/36] Update `test_messages_from_parquet()` to use JSON strings instead of bytes --- cli/tests/test_messages_from_parquet.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 87fc226f8..7e3876b7a 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -30,18 +30,18 @@ def test_messages_from_parquet(resources: Path) -> None: message_bodies = [msg.serialise(deserialisable=False) for msg in messages] expected_messages = [ - b'{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' - b'"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "12345678", "accession_number": "ABC1234567", "study_datetime": "2021-07-01", ' - b'"procedure_occurrence_id": 2, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "987654321", "accession_number": "ABC1234560", "study_datetime": "2020-05-01", ' - b'"procedure_occurrence_id": 3, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "5020765", "accession_number": "MIG0234560", "study_datetime": "2015-05-01", ' - b'"procedure_occurrence_id": 4, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', + '{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' + '"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' + '"omop_es_timestamp": "2023-12-07T14:08:58"}', + '{"mrn": "12345678", "accession_number": "ABC1234567", "study_datetime": "2021-07-01", ' + '"procedure_occurrence_id": 2, "project_name": "Test Extract - UCLH OMOP CDM", ' + '"omop_es_timestamp": "2023-12-07T14:08:58"}', + '{"mrn": "987654321", "accession_number": "ABC1234560", "study_datetime": "2020-05-01", ' + '"procedure_occurrence_id": 3, "project_name": "Test Extract - UCLH OMOP CDM", ' + '"omop_es_timestamp": "2023-12-07T14:08:58"}', + '{"mrn": "5020765", "accession_number": "MIG0234560", "study_datetime": "2015-05-01", ' + '"procedure_occurrence_id": 4, "project_name": "Test Extract - UCLH OMOP CDM", ' + '"omop_es_timestamp": "2023-12-07T14:08:58"}', ] assert message_bodies == expected_messages From 99ffd0032287e87a3ee827574c802812b68e86fd Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:07:38 +0000 Subject: [PATCH 23/36] Update `PixlProducer.publish()` to use a list of Message objects and handle serialisation --- pixl_core/src/core/patient_queue/producer.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/pixl_core/src/core/patient_queue/producer.py b/pixl_core/src/core/patient_queue/producer.py index 38f5ec5d1..f4bd4f290 100644 --- a/pixl_core/src/core/patient_queue/producer.py +++ b/pixl_core/src/core/patient_queue/producer.py @@ -16,6 +16,8 @@ import logging from time import sleep +from core.patient_queue.message import Message + from ._base import PixlBlockingInterface LOGGER = logging.getLogger(__name__) @@ -24,7 +26,7 @@ class PixlProducer(PixlBlockingInterface): """Generic publisher for RabbitMQ""" - def publish(self, messages: list[bytes]) -> None: + def publish(self, messages: list[Message]) -> None: """ Sends a list of serialised messages to a queue. :param messages: list of messages to be sent to queue @@ -32,11 +34,15 @@ def publish(self, messages: list[bytes]) -> None: LOGGER.debug("Publishing %i messages to queue: %s", len(messages), self.queue_name) if len(messages) > 0: for msg in messages: + LOGGER.debug("Serialising message") + serialised_msg = msg.serialise() LOGGER.debug("Preparing to publish") - self._channel.basic_publish(exchange="", routing_key=self.queue_name, body=msg) + self._channel.basic_publish( + exchange="", routing_key=self.queue_name, body=serialised_msg + ) # RabbitMQ can miss-order messages if there is not a sufficient delay sleep(0.1) - LOGGER.debug("Message %s published to queue %s", msg.decode(), self.queue_name) + LOGGER.debug("Message %s published to queue %s", serialised_msg, self.queue_name) else: LOGGER.debug("List of messages is empty so nothing will be published to queue.") From 986633da08da00adcea1975e89c2dde56c4dbd8e Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:24:15 +0000 Subject: [PATCH 24/36] Convert JSON string to bytes when serialising --- pixl_core/src/core/patient_queue/message.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 13cac0633..81bfd2adb 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -37,7 +37,7 @@ class Message: def serialise(self, deserialisable: bool = True) -> Any: # noqa: FBT001, FBT002 """ - Serialise the message into a JSON string. + Serialise the message into a JSON string and convert to bytes. :param deserialisable: If True, the serialised message will be deserialisable, by setting the unpicklable flag to False in jsonpickle.encode(), meaning that the original Message @@ -60,7 +60,7 @@ def serialise(self, deserialisable: bool = True) -> Any: # noqa: FBT001, FBT002 ) logger.debug(msg) - return encode(self, unpicklable=deserialisable) + return str.encode(encode(self, unpicklable=deserialisable)) def deserialise(serialised_msg: str) -> Any: From d2c05ca7af3b336dc048cfcde1e16ba8fa93c717 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:24:31 +0000 Subject: [PATCH 25/36] Revert "Update `test_messages_from_parquet()` to use JSON strings instead of bytes" This reverts commit 0e4fce42dbef912f45e9f1b0105bb44cb771c95e. --- cli/tests/test_messages_from_parquet.py | 24 +++++++++---------- pixl_core/tests/patient_queue/test_message.py | 10 ++++---- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 7e3876b7a..87fc226f8 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -30,18 +30,18 @@ def test_messages_from_parquet(resources: Path) -> None: message_bodies = [msg.serialise(deserialisable=False) for msg in messages] expected_messages = [ - '{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' - '"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' - '"omop_es_timestamp": "2023-12-07T14:08:58"}', - '{"mrn": "12345678", "accession_number": "ABC1234567", "study_datetime": "2021-07-01", ' - '"procedure_occurrence_id": 2, "project_name": "Test Extract - UCLH OMOP CDM", ' - '"omop_es_timestamp": "2023-12-07T14:08:58"}', - '{"mrn": "987654321", "accession_number": "ABC1234560", "study_datetime": "2020-05-01", ' - '"procedure_occurrence_id": 3, "project_name": "Test Extract - UCLH OMOP CDM", ' - '"omop_es_timestamp": "2023-12-07T14:08:58"}', - '{"mrn": "5020765", "accession_number": "MIG0234560", "study_datetime": "2015-05-01", ' - '"procedure_occurrence_id": 4, "project_name": "Test Extract - UCLH OMOP CDM", ' - '"omop_es_timestamp": "2023-12-07T14:08:58"}', + b'{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' + b'"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' + b'"omop_es_timestamp": "2023-12-07T14:08:58"}', + b'{"mrn": "12345678", "accession_number": "ABC1234567", "study_datetime": "2021-07-01", ' + b'"procedure_occurrence_id": 2, "project_name": "Test Extract - UCLH OMOP CDM", ' + b'"omop_es_timestamp": "2023-12-07T14:08:58"}', + b'{"mrn": "987654321", "accession_number": "ABC1234560", "study_datetime": "2020-05-01", ' + b'"procedure_occurrence_id": 3, "project_name": "Test Extract - UCLH OMOP CDM", ' + b'"omop_es_timestamp": "2023-12-07T14:08:58"}', + b'{"mrn": "5020765", "accession_number": "MIG0234560", "study_datetime": "2015-05-01", ' + b'"procedure_occurrence_id": 4, "project_name": "Test Extract - UCLH OMOP CDM", ' + b'"omop_es_timestamp": "2023-12-07T14:08:58"}', ] assert message_bodies == expected_messages diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py index 2ed745956..8471cfdc4 100644 --- a/pixl_core/tests/patient_queue/test_message.py +++ b/pixl_core/tests/patient_queue/test_message.py @@ -33,11 +33,11 @@ def test_serialise() -> None: """Checks that messages can be correctly serialised""" msg_body = msg.serialise(deserialisable=False) assert ( - msg_body == '{"mrn": "111", "accession_number": "123", ' - '"study_datetime": "2022-11-22T13:33:00+00:00", ' - '"procedure_occurrence_id": "234", ' - '"project_name": "test project", ' - '"omop_es_timestamp": "2023-12-07T14:08:00+00:00"}' + msg_body == b'{"mrn": "111", "accession_number": "123", ' + b'"study_datetime": "2022-11-22T13:33:00+00:00", ' + b'"procedure_occurrence_id": "234", ' + b'"project_name": "test project", ' + b'"omop_es_timestamp": "2023-12-07T14:08:00+00:00"}' ) From cd3e1015f436eff1d0648db7dd2664f17f0d304e Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:36:59 +0000 Subject: [PATCH 26/36] `PixlProducer.publish()` should take a `list[Message]` as input in tests --- pixl_core/tests/patient_queue/test_producer.py | 11 ++++++++++- pixl_core/tests/patient_queue/test_subscriber.py | 15 ++++++++++++--- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/pixl_core/tests/patient_queue/test_producer.py b/pixl_core/tests/patient_queue/test_producer.py index 2aa73691e..1aca4cda4 100644 --- a/pixl_core/tests/patient_queue/test_producer.py +++ b/pixl_core/tests/patient_queue/test_producer.py @@ -12,9 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest +from core.patient_queue.message import Message from core.patient_queue.producer import PixlProducer TEST_QUEUE = "test_publish" +TEST_MESSAGE = Message( + mrn="111", + accession_number="123", + study_datetime="2022-11-22T13:33:00+00:00", + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp="2023-12-07T14:08:00+00:00", +) @pytest.mark.pika() @@ -32,7 +41,7 @@ def test_publish() -> None: """ with PixlProducer(queue_name=TEST_QUEUE) as pp: pp.clear_queue() - pp.publish(messages=[b"test"]) + pp.publish(messages=[TEST_MESSAGE]) with PixlProducer(queue_name=TEST_QUEUE) as pp: assert pp.message_count == 1 diff --git a/pixl_core/tests/patient_queue/test_subscriber.py b/pixl_core/tests/patient_queue/test_subscriber.py index e2a8c6b1d..b306a0279 100644 --- a/pixl_core/tests/patient_queue/test_subscriber.py +++ b/pixl_core/tests/patient_queue/test_subscriber.py @@ -18,12 +18,21 @@ from unittest import TestCase import pytest +from core.patient_queue.message import Message from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer, PixlConsumer from core.token_buffer.tokens import TokenBucket TEST_QUEUE = "test_consume" -MESSAGE_BODY = b"test" +TEST_MESSAGE = Message( + mrn="111", + accession_number="123", + study_datetime="2022-11-22T13:33:00+00:00", + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp="2023-12-07T14:08:00+00:00", +) + counter = 0 @@ -52,7 +61,7 @@ async def test_create(self) -> None: """Checks consume is working.""" global counter # noqa: PLW0602 with PixlProducer(queue_name=TEST_QUEUE) as pp: - pp.publish(messages=[MESSAGE_BODY]) + pp.publish(messages=[TEST_MESSAGE]) async with PixlConsumer(queue_name=TEST_QUEUE, token_bucket=TokenBucket()) as pc: @@ -78,7 +87,7 @@ def test_consume_all() -> None: graceful shutdown. """ with PixlProducer(queue_name=TEST_QUEUE) as pp: - pp.publish(messages=[MESSAGE_BODY, MESSAGE_BODY]) + pp.publish(messages=[TEST_MESSAGE, TEST_MESSAGE]) with PixlBlockingConsumer(queue_name=TEST_QUEUE) as bc: counter_bc = bc.consume_all(timeout_in_seconds=2, file_path=Path("test_producer.csv")) From 0b855e1dc37ca017a0e60a64d362b117ab68946e Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:38:05 +0000 Subject: [PATCH 27/36] Update EHR API to use new `Message` design --- pixl_ehr/src/pixl_ehr/_processing.py | 19 +++++++++---------- pixl_ehr/tests/test_processing.py | 3 +-- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pixl_ehr/src/pixl_ehr/_processing.py b/pixl_ehr/src/pixl_ehr/_processing.py index 88cc17d02..2584d596f 100644 --- a/pixl_ehr/src/pixl_ehr/_processing.py +++ b/pixl_ehr/src/pixl_ehr/_processing.py @@ -21,7 +21,7 @@ from typing import Optional import requests -from core.patient_queue.message import SerialisedMessage +from core.patient_queue.message import Message from decouple import config from pixl_ehr._databases import EMAPStar, PIXLDatabase @@ -35,14 +35,14 @@ _this_dir = Path(Path(__file__).parent) -async def process_message(serialised_message: SerialisedMessage) -> None: - logger.info("Processing: %s", serialised_message.decode()) +async def process_message(message: Message) -> None: + logger.info("Processing: %s", message.serialise(deserialisable=False)) - raw_data = PatientEHRData.from_message(serialised_message) + raw_data = PatientEHRData.from_message(message) pixl_db = PIXLDatabase() if pixl_db.contains(raw_data): - logger.info("Messaged has already been processed") + logger.info("Message has already been processed") return emap_star_db = EMAPStar() @@ -79,16 +79,15 @@ class PatientEHRData: report_text: Optional[str] = None @classmethod - def from_message(cls, serialised_message: SerialisedMessage) -> "PatientEHRData": + def from_message(cls, message: Message) -> "PatientEHRData": """ Create a minimal set of patient EHR data required to start queries from a queue message """ - message_data = serialised_message.deserialise() self = PatientEHRData( - mrn=message_data["mrn"], - accession_number=message_data["accession_number"], - acquisition_datetime=message_data["study_datetime"], + mrn=message.mrn, + accession_number=message.accession_number, + acquisition_datetime=message.study_datetime, ) logger.debug("Created %s from message data", self) diff --git a/pixl_ehr/tests/test_processing.py b/pixl_ehr/tests/test_processing.py index 9bb75d5d4..d0b933884 100644 --- a/pixl_ehr/tests/test_processing.py +++ b/pixl_ehr/tests/test_processing.py @@ -65,7 +65,6 @@ project_name=project_name, omop_es_timestamp=omop_es_timestamp, ) -serialised_message = message.serialise() class WritableEMAPStar(WriteableDatabase): @@ -164,7 +163,7 @@ def insert_data_into_emap_star_schema() -> None: @pytest.mark.asyncio() async def test_message_processing() -> None: insert_data_into_emap_star_schema() - await process_message(serialised_message) + await process_message(message) pixl_db = QueryablePIXLDB() row = pixl_db.execute_query_string("select * from emap_data.ehr_raw where mrn = %s", [mrn]) From 9a4d6a84dc2ab5d5ee0ecbb7c3b495b4a2e54a22 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:44:20 +0000 Subject: [PATCH 28/36] Update imaging API to use new `Message` design --- pixl_pacs/src/pixl_pacs/_processing.py | 14 +++++++------- pixl_pacs/tests/test_processing.py | 7 +++---- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pixl_pacs/src/pixl_pacs/_processing.py b/pixl_pacs/src/pixl_pacs/_processing.py index bb28830a2..1f1bc555b 100644 --- a/pixl_pacs/src/pixl_pacs/_processing.py +++ b/pixl_pacs/src/pixl_pacs/_processing.py @@ -17,7 +17,7 @@ from dataclasses import dataclass from time import time -from core.patient_queue.message import Message, SerialisedMessage +from core.patient_queue.message import Message from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc @@ -26,10 +26,10 @@ logger.setLevel(os.environ.get("LOG_LEVEL", "WARNING")) -async def process_message(serialised_message: SerialisedMessage) -> None: - logger.info("Processing: %s", serialised_message.decode()) +async def process_message(message: Message) -> None: + logger.info("Processing: %s", message.serialise(deserialisable=False)) - study = ImagingStudy.from_message(serialised_message) + study = ImagingStudy.from_message(message) orthanc_raw = PIXLRawOrthanc() if study.exists_in(orthanc_raw): @@ -48,7 +48,7 @@ async def process_message(serialised_message: SerialisedMessage) -> None: while job_state != "Success": if (time() - start_time) > config("PIXL_DICOM_TRANSFER_TIMEOUT", cast=float): msg = ( - f"Failed to transfer {serialised_message.decode()} within " + f"Failed to transfer {message.decode()} within " f"{config('PIXL_DICOM_TRANSFER_TIMEOUT')} seconds" ) raise TimeoutError(msg) @@ -66,8 +66,8 @@ class ImagingStudy: message: Message @classmethod - def from_message(cls, serialised_message: SerialisedMessage) -> "ImagingStudy": - return ImagingStudy(message=Message(**serialised_message.deserialise())) + def from_message(cls, message: Message) -> "ImagingStudy": + return ImagingStudy(message=message) @property def orthanc_query_dict(self) -> dict: diff --git a/pixl_pacs/tests/test_processing.py b/pixl_pacs/tests/test_processing.py index d263783f4..82084e44b 100644 --- a/pixl_pacs/tests/test_processing.py +++ b/pixl_pacs/tests/test_processing.py @@ -40,7 +40,6 @@ project_name="test project", omop_es_timestamp=datetime.datetime.fromisoformat("1234-01-01 00:00:00"), ) -serialised_message = message.serialise() class WritableOrthanc(Orthanc): @@ -74,15 +73,15 @@ def add_image_to_fake_vna(image_filename: str = "test.dcm") -> None: @pytest.mark.asyncio() async def test_image_processing() -> None: add_image_to_fake_vna() - study = ImagingStudy.from_message(serialised_message) + study = ImagingStudy.from_message(message) orthanc_raw = PIXLRawOrthanc() assert not study.exists_in(orthanc_raw) - await process_message(serialised_message) + await process_message(message) assert study.exists_in(orthanc_raw) # TODO: check time last updated after processing again # noqa: FIX002 # is not incremented # https://github.com/UCLH-Foundry/PIXL/issues/156 - await process_message(serialised_message) + await process_message(message) assert study.exists_in(orthanc_raw) From baad796e94d1833272cecf30c618de9f3cd96f74 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 14:56:46 +0000 Subject: [PATCH 29/36] Update deserialise function to accept bytes-encoded JSON string --- pixl_core/src/core/patient_queue/message.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 81bfd2adb..5e864510d 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -63,12 +63,12 @@ def serialise(self, deserialisable: bool = True) -> Any: # noqa: FBT001, FBT002 return str.encode(encode(self, unpicklable=deserialisable)) -def deserialise(serialised_msg: str) -> Any: +def deserialise(serialised_msg: bytes) -> Any: """ - Deserialise a message from a JSON string. + Deserialise a message from a bytes-encoded JSON string. If the message was serialised with `deserialisable=True`, the original Message object will be returned. Otherwise, a dictionary will be returned. :param serialised_msg: The serialised message. """ - return decode(serialised_msg) # noqa: S301 + return decode(serialised_msg.decode("utf-8")) # noqa: S301 From a81e285a9fdecf668bb280061ce9f2f362e7cfd6 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 15:26:12 +0000 Subject: [PATCH 30/36] Assert messages against list of `Message`s --- cli/tests/test_messages_from_parquet.py | 47 +++++++++++++++++-------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 87fc226f8..0cc4b00e2 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -28,20 +28,39 @@ def test_messages_from_parquet(resources: Path) -> None: messages = messages_from_parquet(omop_parquet_dir) assert all(isinstance(msg, Message) for msg in messages) - message_bodies = [msg.serialise(deserialisable=False) for msg in messages] expected_messages = [ - b'{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' - b'"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "12345678", "accession_number": "ABC1234567", "study_datetime": "2021-07-01", ' - b'"procedure_occurrence_id": 2, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "987654321", "accession_number": "ABC1234560", "study_datetime": "2020-05-01", ' - b'"procedure_occurrence_id": 3, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "5020765", "accession_number": "MIG0234560", "study_datetime": "2015-05-01", ' - b'"procedure_occurrence_id": 4, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', + Message( + mrn="12345678", + accession_number="12345678", + study_datetime="2021-07-01", + procedure_occurrence_id=1, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp="2023-12-07T14:08:58", + ), + Message( + mrn="12345678", + accession_number="ABC1234567", + study_datetime="2021-07-01", + procedure_occurrence_id=2, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp="2023-12-07T14:08:58", + ), + Message( + mrn="987654321", + accession_number="ABC1234560", + study_datetime="2020-05-01", + procedure_occurrence_id=3, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp="2023-12-07T14:08:58", + ), + Message( + mrn="5020765", + accession_number="MIG0234560", + study_datetime="2015-05-01", + procedure_occurrence_id=4, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp="2023-12-07T14:08:58", + ), ] - assert message_bodies == expected_messages + assert messages == expected_messages From 8583ca0932a336499c72c1b012b48c7beb47f5c6 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 15:29:01 +0000 Subject: [PATCH 31/36] Print dataclass in logs --- pixl_core/src/core/patient_queue/producer.py | 2 +- pixl_ehr/src/pixl_ehr/_processing.py | 2 +- pixl_pacs/src/pixl_pacs/_processing.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pixl_core/src/core/patient_queue/producer.py b/pixl_core/src/core/patient_queue/producer.py index f4bd4f290..b840a4584 100644 --- a/pixl_core/src/core/patient_queue/producer.py +++ b/pixl_core/src/core/patient_queue/producer.py @@ -42,7 +42,7 @@ def publish(self, messages: list[Message]) -> None: ) # RabbitMQ can miss-order messages if there is not a sufficient delay sleep(0.1) - LOGGER.debug("Message %s published to queue %s", serialised_msg, self.queue_name) + LOGGER.debug("Message %s published to queue %s", msg, self.queue_name) else: LOGGER.debug("List of messages is empty so nothing will be published to queue.") diff --git a/pixl_ehr/src/pixl_ehr/_processing.py b/pixl_ehr/src/pixl_ehr/_processing.py index 2584d596f..89a9380f7 100644 --- a/pixl_ehr/src/pixl_ehr/_processing.py +++ b/pixl_ehr/src/pixl_ehr/_processing.py @@ -36,7 +36,7 @@ async def process_message(message: Message) -> None: - logger.info("Processing: %s", message.serialise(deserialisable=False)) + logger.info("Processing: %s", message) raw_data = PatientEHRData.from_message(message) pixl_db = PIXLDatabase() diff --git a/pixl_pacs/src/pixl_pacs/_processing.py b/pixl_pacs/src/pixl_pacs/_processing.py index 1f1bc555b..af0733444 100644 --- a/pixl_pacs/src/pixl_pacs/_processing.py +++ b/pixl_pacs/src/pixl_pacs/_processing.py @@ -27,7 +27,7 @@ async def process_message(message: Message) -> None: - logger.info("Processing: %s", message.serialise(deserialisable=False)) + logger.info("Processing: %s", message) study = ImagingStudy.from_message(message) orthanc_raw = PIXLRawOrthanc() From 7c32e8d39f7d6448c6df65adb4ee3bc792f9a765 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 15:32:18 +0000 Subject: [PATCH 32/36] `jsonpickle.decode()` can handle bytes so no need to decode first Also add a note about why we ignore ruff rule S301 --- pixl_core/src/core/patient_queue/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 5e864510d..0a97173da 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -71,4 +71,4 @@ def deserialise(serialised_msg: bytes) -> Any: :param serialised_msg: The serialised message. """ - return decode(serialised_msg.decode("utf-8")) # noqa: S301 + return decode(serialised_msg) # noqa: S301, since we control the input, so no security risks From ae9083899e31823c49b80e243c266f04b571b440 Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 15:33:26 +0000 Subject: [PATCH 33/36] Make `deserialisable` a keyword only argument --- pixl_core/src/core/patient_queue/message.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py index 0a97173da..595be68c0 100644 --- a/pixl_core/src/core/patient_queue/message.py +++ b/pixl_core/src/core/patient_queue/message.py @@ -35,7 +35,7 @@ class Message: project_name: str omop_es_timestamp: datetime - def serialise(self, deserialisable: bool = True) -> Any: # noqa: FBT001, FBT002 + def serialise(self, *, deserialisable: bool = True) -> bytes: """ Serialise the message into a JSON string and convert to bytes. From 2c49dc3af9561a27cdab493fb9db989a1e43226f Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 15:47:08 +0000 Subject: [PATCH 34/36] =?UTF-8?q?Copilot=20forgot=20to=20convert=20dates?= =?UTF-8?q?=20to=20datetimes=20=F0=9F=A5=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cli/tests/test_messages_from_parquet.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index 0cc4b00e2..c092cc489 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -13,6 +13,7 @@ # limitations under the License. """Unit tests for reading cohorts from parquet files.""" +import datetime from pathlib import Path from core.patient_queue.message import Message @@ -32,34 +33,34 @@ def test_messages_from_parquet(resources: Path) -> None: Message( mrn="12345678", accession_number="12345678", - study_datetime="2021-07-01", + study_datetime=datetime.date.fromisoformat("2021-07-01"), procedure_occurrence_id=1, project_name="Test Extract - UCLH OMOP CDM", - omop_es_timestamp="2023-12-07T14:08:58", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), Message( mrn="12345678", accession_number="ABC1234567", - study_datetime="2021-07-01", + study_datetime=datetime.date.fromisoformat("2021-07-01"), procedure_occurrence_id=2, project_name="Test Extract - UCLH OMOP CDM", - omop_es_timestamp="2023-12-07T14:08:58", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), Message( mrn="987654321", accession_number="ABC1234560", - study_datetime="2020-05-01", + study_datetime=datetime.date.fromisoformat("2020-05-01"), procedure_occurrence_id=3, project_name="Test Extract - UCLH OMOP CDM", - omop_es_timestamp="2023-12-07T14:08:58", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), Message( mrn="5020765", accession_number="MIG0234560", - study_datetime="2015-05-01", + study_datetime=datetime.date.fromisoformat("2015-05-01"), procedure_occurrence_id=4, project_name="Test Extract - UCLH OMOP CDM", - omop_es_timestamp="2023-12-07T14:08:58", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), ), ] From db1e34d896c0f00cafcae64c6218c509fc18877a Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 16:13:15 +0000 Subject: [PATCH 35/36] Refactor PixlConsumer run method to accept Message object as callback parameter and deserialise --- pixl_core/src/core/patient_queue/subscriber.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pixl_core/src/core/patient_queue/subscriber.py b/pixl_core/src/core/patient_queue/subscriber.py index cf5361297..bc868ee6b 100644 --- a/pixl_core/src/core/patient_queue/subscriber.py +++ b/pixl_core/src/core/patient_queue/subscriber.py @@ -21,6 +21,7 @@ import aio_pika +from core.patient_queue.message import Message, deserialise from core.token_buffer.tokens import TokenBucket from ._base import PixlBlockingInterface, PixlQueueInterface @@ -52,7 +53,7 @@ async def __aenter__(self) -> "PixlConsumer": self._queue = await self._channel.declare_queue(self.queue_name) return self - async def run(self, callback: Callable[[bytes], Awaitable[None]]) -> None: + async def run(self, callback: Callable[[Message], Awaitable[None]]) -> None: """ Creates loop that waits for messages from producer and processes them as they appear. @@ -73,7 +74,7 @@ async def run(self, callback: Callable[[bytes], Awaitable[None]]) -> None: try: await asyncio.sleep(0.01) # Avoid very fast callbacks - await callback(message.body) + await callback(deserialise(message.body)) except Exception: LOGGER.exception( "Failed to process %s" "Not re-queuing message", From 7343fcda2b6d3624610be10814c3127c90643edd Mon Sep 17 00:00:00 2001 From: Milan Malfait Date: Wed, 20 Dec 2023 17:30:26 +0000 Subject: [PATCH 36/36] Update consumer in `test_subscriber` to accept Message object instead of bytes --- pixl_core/tests/patient_queue/test_subscriber.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pixl_core/tests/patient_queue/test_subscriber.py b/pixl_core/tests/patient_queue/test_subscriber.py index b306a0279..32910dedb 100644 --- a/pixl_core/tests/patient_queue/test_subscriber.py +++ b/pixl_core/tests/patient_queue/test_subscriber.py @@ -65,13 +65,13 @@ async def test_create(self) -> None: async with PixlConsumer(queue_name=TEST_QUEUE, token_bucket=TokenBucket()) as pc: - async def consume(msg: bytes) -> None: + async def consume(msg: Message) -> None: """ Increases counter when message is downloaded. :param msg: body of the message, though not needed :returns: the increased counter, though here only once """ - if str(msg) != "": + if str(msg.serialise()) != "": global counter counter += 1