diff --git a/cli/src/pixl_cli/main.py b/cli/src/pixl_cli/main.py index e9e3bbfc2..fdaf3d71c 100644 --- a/cli/src/pixl_cli/main.py +++ b/cli/src/pixl_cli/main.py @@ -16,6 +16,7 @@ import datetime import json import os +from operator import attrgetter from pathlib import Path from typing import Any, Optional @@ -23,9 +24,9 @@ import pandas as pd import requests import yaml +from core.patient_queue.message import Message, deserialise from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer -from core.patient_queue.utils import deserialise, serialise from ._logging import logger, set_log_level from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty @@ -84,12 +85,12 @@ def populate(queues: str, *, restart: bool, parquet_dir: Path) -> None: if state_filepath.exists() and restart: logger.info(f"Extracting messages from state: {state_filepath}") inform_user_that_queue_will_be_populated_from(state_filepath) - messages = Messages.from_state_file(state_filepath) + messages = messages_from_state_file(state_filepath) elif parquet_dir is not None: messages = messages_from_parquet(parquet_dir) remove_file_if_it_exists(state_filepath) # will be stale - producer.publish(sorted(messages, key=study_date_from_serialised)) + producer.publish(sorted(messages, key=attrgetter("study_datetime"))) @cli.command() @@ -273,41 +274,26 @@ def state_filepath_for_queue(queue_name: str) -> Path: return Path(f"{queue_name.replace('/', '_')}.state") -class Messages(list): +def messages_from_state_file(filepath: Path) -> list[Message]: """ - Class to represent messages + Return messages from a state file path - Methods - ------- - from_state_file(cls, filepath) - Return messages from a state file path + :param filepath: Path for state file to be read + :return: A list of Message objects containing all the messages from the state file """ + logger.info(f"Creating messages from {filepath}") + if not filepath.exists(): + raise FileNotFoundError + if filepath.suffix != ".state": + msg = f"Invalid file suffix for {filepath}. Expected .state" + raise ValueError(msg) - @classmethod - def from_state_file(cls, filepath: Path) -> "Messages": - """ - Return messages from a state file path - - :param filepath: Path for state file to be read - :return: A Messages object containing all the messages from the state file - """ - logger.info(f"Creating messages from {filepath}") - if not filepath.exists(): - raise FileNotFoundError - if filepath.suffix != ".state": - msg = f"Invalid file suffix for {filepath}. Expected .state" - raise ValueError(msg) - - return cls( - [ - line.encode("utf-8") - for line in Path.open(filepath).readlines() - if string_is_non_empty(line) - ] - ) + return [ + deserialise(line) for line in Path.open(filepath).readlines() if string_is_non_empty(line) + ] -def messages_from_parquet(dir_path: Path) -> Messages: +def messages_from_parquet(dir_path: Path) -> list[Message]: """ Reads patient information from parquet files within directory structure and transforms that into messages. @@ -345,9 +331,6 @@ def messages_from_parquet(dir_path: Path) -> Messages: f"{expected_col_names}" ) - # First line is column names - messages = Messages() - for col in expected_col_names: if col not in list(cohort_data.columns): msg = f"csv file expected to have at least {expected_col_names} as " f"column names" @@ -367,17 +350,19 @@ def messages_from_parquet(dir_path: Path) -> Messages: project_name = logs["settings"]["cdm_source_name"] omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"]) + messages = [] + for _, row in cohort_data.iterrows(): - messages.append( - serialise( - mrn=row[mrn_col_name], - accession_number=row[acc_num_col_name], - study_datetime=row[dt_col_name], - procedure_occurrence_id=row[procedure_occurrence_id], - project_name=project_name, - omop_es_timestamp=omop_es_timestamp, - ) + # Create new dict to initialise message + message = Message( + mrn=row[mrn_col_name], + accession_number=row[acc_num_col_name], + study_datetime=row[dt_col_name], + procedure_occurrence_id=row[procedure_occurrence_id], + project_name=project_name, + omop_es_timestamp=omop_es_timestamp, ) + messages.append(message) if len(messages) == 0: msg = f"Failed to find any messages in {dir_path}" @@ -446,12 +431,3 @@ def api_config_for_queue(queue_name: str) -> APIConfig: raise ValueError(msg) return APIConfig(config[config_key]) - - -def study_date_from_serialised(message: bytes) -> datetime.datetime: - """Get the study date from a serialised message as a datetime""" - result = deserialise(message)["study_datetime"] - if not isinstance(result, datetime.datetime): - msg = "Expected study date to be a datetime. Got %s" - raise TypeError(msg, type(result)) - return result diff --git a/cli/tests/test_messages_from_parquet.py b/cli/tests/test_messages_from_parquet.py index ec081ac4f..c092cc489 100644 --- a/cli/tests/test_messages_from_parquet.py +++ b/cli/tests/test_messages_from_parquet.py @@ -13,8 +13,10 @@ # limitations under the License. """Unit tests for reading cohorts from parquet files.""" +import datetime from pathlib import Path +from core.patient_queue.message import Message from pixl_cli.main import messages_from_parquet @@ -25,19 +27,41 @@ def test_messages_from_parquet(resources: Path) -> None: """ omop_parquet_dir = resources / "omop" messages = messages_from_parquet(omop_parquet_dir) + assert all(isinstance(msg, Message) for msg in messages) + expected_messages = [ - b'{"mrn": "12345678", "accession_number": "12345678", "study_datetime": "2021-07-01", ' - b'"procedure_occurrence_id": 1, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "12345678", "accession_number": "ABC1234567", "study_datetime": "2021-07-01", ' - b'"procedure_occurrence_id": 2, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "987654321", "accession_number": "ABC1234560", "study_datetime": "2020-05-01", ' - b'"procedure_occurrence_id": 3, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', - b'{"mrn": "5020765", "accession_number": "MIG0234560", "study_datetime": "2015-05-01", ' - b'"procedure_occurrence_id": 4, "project_name": "Test Extract - UCLH OMOP CDM", ' - b'"omop_es_timestamp": "2023-12-07T14:08:58"}', + Message( + mrn="12345678", + accession_number="12345678", + study_datetime=datetime.date.fromisoformat("2021-07-01"), + procedure_occurrence_id=1, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), + ), + Message( + mrn="12345678", + accession_number="ABC1234567", + study_datetime=datetime.date.fromisoformat("2021-07-01"), + procedure_occurrence_id=2, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), + ), + Message( + mrn="987654321", + accession_number="ABC1234560", + study_datetime=datetime.date.fromisoformat("2020-05-01"), + procedure_occurrence_id=3, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), + ), + Message( + mrn="5020765", + accession_number="MIG0234560", + study_datetime=datetime.date.fromisoformat("2015-05-01"), + procedure_occurrence_id=4, + project_name="Test Extract - UCLH OMOP CDM", + omop_es_timestamp=datetime.datetime.fromisoformat("2023-12-07T14:08:58"), + ), ] assert messages == expected_messages diff --git a/pixl_core/pyproject.toml b/pixl_core/pyproject.toml index 8fdd72727..056307e6b 100644 --- a/pixl_core/pyproject.toml +++ b/pixl_core/pyproject.toml @@ -18,7 +18,8 @@ dependencies = [ "pika==1.3.1", "aio_pika==8.2.4", "environs==9.5.0", - "requests==2.31.0" + "requests==2.31.0", + "jsonpickle==3.0.2" ] [project.optional-dependencies] diff --git a/pixl_core/src/core/patient_queue/message.py b/pixl_core/src/core/patient_queue/message.py new file mode 100644 index 000000000..595be68c0 --- /dev/null +++ b/pixl_core/src/core/patient_queue/message.py @@ -0,0 +1,74 @@ +# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Classes to represent messages in the patient queue.""" + +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +from jsonpickle import decode, encode + +logger = logging.getLogger(__name__) + + +@dataclass +class Message: + """Class to represent a message containing the relevant information for a study.""" + + mrn: str + accession_number: str + study_datetime: datetime + procedure_occurrence_id: str + project_name: str + omop_es_timestamp: datetime + + def serialise(self, *, deserialisable: bool = True) -> bytes: + """ + Serialise the message into a JSON string and convert to bytes. + + :param deserialisable: If True, the serialised message will be deserialisable, by setting + the unpicklable flag to False in jsonpickle.encode(), meaning that the original Message + object can be recovered by `deserialise()`. If False, calling `deserialise()` on the + serialised message will return a dictionary. + """ + msg = ( + "Serialising message with\n" + " * patient id: %s\n" + " * accession number: %s\n" + " * timestamp: %s\n" + " * procedure_occurrence_id: %s\n", + " * project_name: %s\n * omop_es_timestamp: %s", + self.mrn, + self.accession_number, + self.study_datetime, + self.procedure_occurrence_id, + self.project_name, + self.omop_es_timestamp, + ) + logger.debug(msg) + + return str.encode(encode(self, unpicklable=deserialisable)) + + +def deserialise(serialised_msg: bytes) -> Any: + """ + Deserialise a message from a bytes-encoded JSON string. + If the message was serialised with `deserialisable=True`, the original Message object will be + returned. Otherwise, a dictionary will be returned. + + :param serialised_msg: The serialised message. + """ + return decode(serialised_msg) # noqa: S301, since we control the input, so no security risks diff --git a/pixl_core/src/core/patient_queue/producer.py b/pixl_core/src/core/patient_queue/producer.py index 38f5ec5d1..b840a4584 100644 --- a/pixl_core/src/core/patient_queue/producer.py +++ b/pixl_core/src/core/patient_queue/producer.py @@ -16,6 +16,8 @@ import logging from time import sleep +from core.patient_queue.message import Message + from ._base import PixlBlockingInterface LOGGER = logging.getLogger(__name__) @@ -24,7 +26,7 @@ class PixlProducer(PixlBlockingInterface): """Generic publisher for RabbitMQ""" - def publish(self, messages: list[bytes]) -> None: + def publish(self, messages: list[Message]) -> None: """ Sends a list of serialised messages to a queue. :param messages: list of messages to be sent to queue @@ -32,11 +34,15 @@ def publish(self, messages: list[bytes]) -> None: LOGGER.debug("Publishing %i messages to queue: %s", len(messages), self.queue_name) if len(messages) > 0: for msg in messages: + LOGGER.debug("Serialising message") + serialised_msg = msg.serialise() LOGGER.debug("Preparing to publish") - self._channel.basic_publish(exchange="", routing_key=self.queue_name, body=msg) + self._channel.basic_publish( + exchange="", routing_key=self.queue_name, body=serialised_msg + ) # RabbitMQ can miss-order messages if there is not a sufficient delay sleep(0.1) - LOGGER.debug("Message %s published to queue %s", msg.decode(), self.queue_name) + LOGGER.debug("Message %s published to queue %s", msg, self.queue_name) else: LOGGER.debug("List of messages is empty so nothing will be published to queue.") diff --git a/pixl_core/src/core/patient_queue/subscriber.py b/pixl_core/src/core/patient_queue/subscriber.py index cf5361297..bc868ee6b 100644 --- a/pixl_core/src/core/patient_queue/subscriber.py +++ b/pixl_core/src/core/patient_queue/subscriber.py @@ -21,6 +21,7 @@ import aio_pika +from core.patient_queue.message import Message, deserialise from core.token_buffer.tokens import TokenBucket from ._base import PixlBlockingInterface, PixlQueueInterface @@ -52,7 +53,7 @@ async def __aenter__(self) -> "PixlConsumer": self._queue = await self._channel.declare_queue(self.queue_name) return self - async def run(self, callback: Callable[[bytes], Awaitable[None]]) -> None: + async def run(self, callback: Callable[[Message], Awaitable[None]]) -> None: """ Creates loop that waits for messages from producer and processes them as they appear. @@ -73,7 +74,7 @@ async def run(self, callback: Callable[[bytes], Awaitable[None]]) -> None: try: await asyncio.sleep(0.01) # Avoid very fast callbacks - await callback(message.body) + await callback(deserialise(message.body)) except Exception: LOGGER.exception( "Failed to process %s" "Not re-queuing message", diff --git a/pixl_core/src/core/patient_queue/utils.py b/pixl_core/src/core/patient_queue/utils.py deleted file mode 100644 index 3619b667c..000000000 --- a/pixl_core/src/core/patient_queue/utils.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Utility functions""" - -import json -import logging -from datetime import datetime - -logger = logging.getLogger(__name__) - - -def deserialise(message_body: bytes) -> dict: - """Returns the de-serialised message in JSON format.""" - logger.debug("De-serialising: %s", message_body.decode()) - data = dict(json.loads(message_body.decode())) - if "study_datetime" in data: - data["study_datetime"] = datetime.fromisoformat(data["study_datetime"]) - return data - - -def serialise( - mrn: str, - accession_number: str, - study_datetime: datetime, - procedure_occurrence_id: str, - project_name: str, - omop_es_timestamp: datetime, -) -> bytes: - """ - Returns serialised message from the given parameters. - :param mrn: patient identifier - :param accession_number: accession number - :param study_datetime: date and time of the study - :param procedure_occurrence_id: the OMOP ID of the procedure - :returns: JSON formatted message - """ - logger.debug( - "Serialising message with patient id %s, " - "accession number: %s and timestamp %s " - "procedure_occurrence_id %s, ", - "project_name %s, omop_es_timestamp %s", - mrn, - accession_number, - study_datetime, - procedure_occurrence_id, - project_name, - omop_es_timestamp, - ) - return json.dumps( - { - "mrn": mrn, - "accession_number": accession_number, - "study_datetime": study_datetime.isoformat(), - "procedure_occurrence_id": procedure_occurrence_id, - "project_name": project_name, - "omop_es_timestamp": omop_es_timestamp.isoformat(), - } - ).encode("utf-8") diff --git a/pixl_core/tests/patient_queue/test_message.py b/pixl_core/tests/patient_queue/test_message.py new file mode 100644 index 000000000..8471cfdc4 --- /dev/null +++ b/pixl_core/tests/patient_queue/test_message.py @@ -0,0 +1,47 @@ +# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import datetime + +from core.patient_queue.message import Message, deserialise + +msg = Message( + mrn="111", + accession_number="123", + study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( + tzinfo=datetime.timezone.utc + ), + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp=datetime.datetime.strptime("Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p").replace( + tzinfo=datetime.timezone.utc + ), +) + + +def test_serialise() -> None: + """Checks that messages can be correctly serialised""" + msg_body = msg.serialise(deserialisable=False) + assert ( + msg_body == b'{"mrn": "111", "accession_number": "123", ' + b'"study_datetime": "2022-11-22T13:33:00+00:00", ' + b'"procedure_occurrence_id": "234", ' + b'"project_name": "test project", ' + b'"omop_es_timestamp": "2023-12-07T14:08:00+00:00"}' + ) + + +def test_deserialise() -> None: + """Checks if deserialised messages are the same as the original""" + serialised_msg = msg.serialise() + assert deserialise(serialised_msg) == msg diff --git a/pixl_core/tests/patient_queue/test_producer.py b/pixl_core/tests/patient_queue/test_producer.py index 2aa73691e..1aca4cda4 100644 --- a/pixl_core/tests/patient_queue/test_producer.py +++ b/pixl_core/tests/patient_queue/test_producer.py @@ -12,9 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import pytest +from core.patient_queue.message import Message from core.patient_queue.producer import PixlProducer TEST_QUEUE = "test_publish" +TEST_MESSAGE = Message( + mrn="111", + accession_number="123", + study_datetime="2022-11-22T13:33:00+00:00", + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp="2023-12-07T14:08:00+00:00", +) @pytest.mark.pika() @@ -32,7 +41,7 @@ def test_publish() -> None: """ with PixlProducer(queue_name=TEST_QUEUE) as pp: pp.clear_queue() - pp.publish(messages=[b"test"]) + pp.publish(messages=[TEST_MESSAGE]) with PixlProducer(queue_name=TEST_QUEUE) as pp: assert pp.message_count == 1 diff --git a/pixl_core/tests/patient_queue/test_subscriber.py b/pixl_core/tests/patient_queue/test_subscriber.py index e2a8c6b1d..32910dedb 100644 --- a/pixl_core/tests/patient_queue/test_subscriber.py +++ b/pixl_core/tests/patient_queue/test_subscriber.py @@ -18,12 +18,21 @@ from unittest import TestCase import pytest +from core.patient_queue.message import Message from core.patient_queue.producer import PixlProducer from core.patient_queue.subscriber import PixlBlockingConsumer, PixlConsumer from core.token_buffer.tokens import TokenBucket TEST_QUEUE = "test_consume" -MESSAGE_BODY = b"test" +TEST_MESSAGE = Message( + mrn="111", + accession_number="123", + study_datetime="2022-11-22T13:33:00+00:00", + procedure_occurrence_id="234", + project_name="test project", + omop_es_timestamp="2023-12-07T14:08:00+00:00", +) + counter = 0 @@ -52,17 +61,17 @@ async def test_create(self) -> None: """Checks consume is working.""" global counter # noqa: PLW0602 with PixlProducer(queue_name=TEST_QUEUE) as pp: - pp.publish(messages=[MESSAGE_BODY]) + pp.publish(messages=[TEST_MESSAGE]) async with PixlConsumer(queue_name=TEST_QUEUE, token_bucket=TokenBucket()) as pc: - async def consume(msg: bytes) -> None: + async def consume(msg: Message) -> None: """ Increases counter when message is downloaded. :param msg: body of the message, though not needed :returns: the increased counter, though here only once """ - if str(msg) != "": + if str(msg.serialise()) != "": global counter counter += 1 @@ -78,7 +87,7 @@ def test_consume_all() -> None: graceful shutdown. """ with PixlProducer(queue_name=TEST_QUEUE) as pp: - pp.publish(messages=[MESSAGE_BODY, MESSAGE_BODY]) + pp.publish(messages=[TEST_MESSAGE, TEST_MESSAGE]) with PixlBlockingConsumer(queue_name=TEST_QUEUE) as bc: counter_bc = bc.consume_all(timeout_in_seconds=2, file_path=Path("test_producer.csv")) diff --git a/pixl_core/tests/patient_queue/test_utils.py b/pixl_core/tests/patient_queue/test_utils.py deleted file mode 100644 index c9b66d0db..000000000 --- a/pixl_core/tests/patient_queue/test_utils.py +++ /dev/null @@ -1,61 +0,0 @@ -# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import datetime -import json - -from core.patient_queue.utils import deserialise, serialise - - -def test_serialise() -> None: - """Checks that messages can be correctly serialised""" - msg_body = serialise( - mrn="111", - accession_number="123", - study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace( - tzinfo=datetime.timezone.utc - ), - procedure_occurrence_id="234", - project_name="test project", - omop_es_timestamp=datetime.datetime.strptime( - "Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p" - ).replace(tzinfo=datetime.timezone.utc), - ) - assert ( - msg_body.decode() == '{"mrn": "111", "accession_number": "123", ' - '"study_datetime": "2022-11-22T13:33:00+00:00", ' - '"procedure_occurrence_id": "234", ' - '"project_name": "test project", ' - '"omop_es_timestamp": "2023-12-07T14:08:00+00:00"}' - ) - - -def test_simple_deserialise() -> None: - """Checks a simple JSON deserialise works""" - assert deserialise((json.dumps({"key": "value"})).encode("utf-8"))["key"] == "value" - - -def test_deserialise_datetime() -> None: - """Checks that datetimes can be correctly serialised""" - timestamp = datetime.datetime.fromordinal(100012) - data = deserialise( - serialise( - mrn="", - accession_number="", - study_datetime=timestamp, - procedure_occurrence_id="", - project_name="", - omop_es_timestamp=datetime.datetime.now(), # noqa: DTZ005 - ) - ) - assert data["study_datetime"] == timestamp diff --git a/pixl_ehr/src/pixl_ehr/_processing.py b/pixl_ehr/src/pixl_ehr/_processing.py index b7d47bb75..89a9380f7 100644 --- a/pixl_ehr/src/pixl_ehr/_processing.py +++ b/pixl_ehr/src/pixl_ehr/_processing.py @@ -21,7 +21,7 @@ from typing import Optional import requests -from core.patient_queue.utils import deserialise +from core.patient_queue.message import Message from decouple import config from pixl_ehr._databases import EMAPStar, PIXLDatabase @@ -35,14 +35,14 @@ _this_dir = Path(Path(__file__).parent) -async def process_message(message_body: bytes) -> None: - logger.info("Processing: %s", message_body.decode()) +async def process_message(message: Message) -> None: + logger.info("Processing: %s", message) - raw_data = PatientEHRData.from_message(message_body) + raw_data = PatientEHRData.from_message(message) pixl_db = PIXLDatabase() if pixl_db.contains(raw_data): - logger.info("Messaged has already been processed") + logger.info("Message has already been processed") return emap_star_db = EMAPStar() @@ -79,16 +79,15 @@ class PatientEHRData: report_text: Optional[str] = None @classmethod - def from_message(cls, message_body: bytes) -> "PatientEHRData": + def from_message(cls, message: Message) -> "PatientEHRData": """ Create a minimal set of patient EHR data required to start queries from a queue message """ - message_data = deserialise(message_body) self = PatientEHRData( - mrn=message_data["mrn"], - accession_number=message_data["accession_number"], - acquisition_datetime=message_data["study_datetime"], + mrn=message.mrn, + accession_number=message.accession_number, + acquisition_datetime=message.study_datetime, ) logger.debug("Created %s from message data", self) diff --git a/pixl_ehr/tests/test_processing.py b/pixl_ehr/tests/test_processing.py index 4dd6f7b1c..d0b933884 100644 --- a/pixl_ehr/tests/test_processing.py +++ b/pixl_ehr/tests/test_processing.py @@ -21,7 +21,7 @@ import datetime import pytest -from core.patient_queue.utils import serialise +from core.patient_queue.message import Message from decouple import config from pixl_ehr._databases import PIXLDatabase, WriteableDatabase from pixl_ehr._processing import process_message @@ -55,7 +55,7 @@ weight_vot_id, height_vot_id, gcs_vot_id = 2222222, 3333333, 4444444 ls_id, lo_id, lr_id, ltd_id = 5555555, 6666666, 7777777, 8888888 -message_body = serialise( +message = Message( mrn=mrn, accession_number=accession_number, study_datetime=datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace( @@ -163,7 +163,7 @@ def insert_data_into_emap_star_schema() -> None: @pytest.mark.asyncio() async def test_message_processing() -> None: insert_data_into_emap_star_schema() - await process_message(message_body) + await process_message(message) pixl_db = QueryablePIXLDB() row = pixl_db.execute_query_string("select * from emap_data.ehr_raw where mrn = %s", [mrn]) diff --git a/pixl_pacs/src/pixl_pacs/_processing.py b/pixl_pacs/src/pixl_pacs/_processing.py index 9338d89ee..af0733444 100644 --- a/pixl_pacs/src/pixl_pacs/_processing.py +++ b/pixl_pacs/src/pixl_pacs/_processing.py @@ -15,10 +15,9 @@ import os from asyncio import sleep from dataclasses import dataclass -from datetime import datetime from time import time -from core.patient_queue.utils import deserialise +from core.patient_queue.message import Message from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc @@ -27,10 +26,10 @@ logger.setLevel(os.environ.get("LOG_LEVEL", "WARNING")) -async def process_message(message_body: bytes) -> None: - logger.info("Processing: %s", message_body.decode()) +async def process_message(message: Message) -> None: + logger.info("Processing: %s", message) - study = ImagingStudy.from_message(message_body) + study = ImagingStudy.from_message(message) orthanc_raw = PIXLRawOrthanc() if study.exists_in(orthanc_raw): @@ -49,7 +48,7 @@ async def process_message(message_body: bytes) -> None: while job_state != "Success": if (time() - start_time) > config("PIXL_DICOM_TRANSFER_TIMEOUT", cast=float): msg = ( - f"Failed to transfer {message_body.decode()} within " + f"Failed to transfer {message.decode()} within " f"{config('PIXL_DICOM_TRANSFER_TIMEOUT')} seconds" ) raise TimeoutError(msg) @@ -64,22 +63,20 @@ async def process_message(message_body: bytes) -> None: class ImagingStudy: """Dataclass for EHR unique to a patient and xray study""" - mrn: str - accession_number: str - study_datetime: datetime - procedure_occurrence_id: str - project_name: str - omop_es_timestamp: datetime + message: Message @classmethod - def from_message(cls, message_body: bytes) -> "ImagingStudy": - return ImagingStudy(**deserialise(message_body)) + def from_message(cls, message: Message) -> "ImagingStudy": + return ImagingStudy(message=message) @property def orthanc_query_dict(self) -> dict: return { "Level": "Study", - "Query": {"PatientID": self.mrn, "AccessionNumber": self.accession_number}, + "Query": { + "PatientID": self.message.mrn, + "AccessionNumber": self.message.accession_number, + }, } def exists_in(self, node: Orthanc) -> bool: diff --git a/pixl_pacs/tests/test_processing.py b/pixl_pacs/tests/test_processing.py index 250d976d9..82084e44b 100644 --- a/pixl_pacs/tests/test_processing.py +++ b/pixl_pacs/tests/test_processing.py @@ -19,7 +19,7 @@ import os import pytest -from core.patient_queue.utils import serialise +from core.patient_queue.message import Message from decouple import config from pixl_pacs._orthanc import Orthanc, PIXLRawOrthanc from pixl_pacs._processing import ImagingStudy, process_message @@ -30,7 +30,7 @@ ACCESSION_NUMBER = "abc" PATIENT_ID = "a_patient" -message_body = serialise( +message = Message( mrn=PATIENT_ID, accession_number=ACCESSION_NUMBER, study_datetime=datetime.datetime.strptime("01/01/1234 01:23:45", "%d/%m/%Y %H:%M:%S").replace( @@ -73,15 +73,15 @@ def add_image_to_fake_vna(image_filename: str = "test.dcm") -> None: @pytest.mark.asyncio() async def test_image_processing() -> None: add_image_to_fake_vna() - study = ImagingStudy.from_message(message_body) + study = ImagingStudy.from_message(message) orthanc_raw = PIXLRawOrthanc() assert not study.exists_in(orthanc_raw) - await process_message(message_body=message_body) + await process_message(message) assert study.exists_in(orthanc_raw) # TODO: check time last updated after processing again # noqa: FIX002 # is not incremented # https://github.com/UCLH-Foundry/PIXL/issues/156 - await process_message(message_body=message_body) + await process_message(message) assert study.exists_in(orthanc_raw)