Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor message serialisation and deserialisation #197

Merged
merged 36 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5cf1111
Refactor message serialization and deserialization
milanmlft Dec 19, 2023
2299519
Rename `utils.py` -> `message.py`
milanmlft Dec 19, 2023
0bfcab0
Add `decode()` method for `SerialisedMessage`
milanmlft Dec 19, 2023
b9e4a59
Update docstring
milanmlft Dec 19, 2023
558f694
Use new classes in message testing
milanmlft Dec 19, 2023
fef8779
Refactor message processing in the CLI
milanmlft Dec 19, 2023
e239299
Refactor `process_message` to use `SerialisedMessage` class in EHR API
milanmlft Dec 19, 2023
596e85c
Refactor `process_message` to use `SerialisedMessage` class in imagin…
milanmlft Dec 19, 2023
7f160a2
Fix `ImagingStudy` initalisation in `ImagingStudy.from_message()`
milanmlft Dec 19, 2023
31a6f29
Fix imports
milanmlft Dec 19, 2023
9a8f5a9
Fix test: access serialised message bodies
milanmlft Dec 19, 2023
aa78318
Turn `Message` into a `dataclass`
milanmlft Dec 19, 2023
cc5b12c
Fix failing tests
milanmlft Dec 19, 2023
b8dfad0
Use `jsonpickle` for (de)serializing messages
milanmlft Dec 20, 2023
f1b848d
Fix `test_deserialise_datetime()` so it uses the `Message` class to a…
milanmlft Dec 20, 2023
8719153
Add `study_datetime` property for `Message`
milanmlft Dec 20, 2023
b08a9c4
No need to test deserialising individual fields, already covered by `…
milanmlft Dec 20, 2023
3e7c553
Remove `study_date_from_serialised()`, use the class attribute `study…
milanmlft Dec 20, 2023
e8c0cc9
Revert "Add `study_datetime` property for `Message`"
milanmlft Dec 20, 2023
fb44bf6
Remove `Messages` class, use `list[Message]` instead
milanmlft Dec 20, 2023
6af792e
Add type checking for messages parsed from parquet input
milanmlft Dec 20, 2023
0e4fce4
Update `test_messages_from_parquet()` to use JSON strings instead of …
milanmlft Dec 20, 2023
99ffd00
Update `PixlProducer.publish()` to use a list of Message objects and …
milanmlft Dec 20, 2023
986633d
Convert JSON string to bytes when serialising
milanmlft Dec 20, 2023
d2c05ca
Revert "Update `test_messages_from_parquet()` to use JSON strings ins…
milanmlft Dec 20, 2023
cd3e101
`PixlProducer.publish()` should take a `list[Message]` as input in tests
milanmlft Dec 20, 2023
0b855e1
Update EHR API to use new `Message` design
milanmlft Dec 20, 2023
9a4d6a8
Update imaging API to use new `Message` design
milanmlft Dec 20, 2023
baad796
Update deserialise function to accept bytes-encoded JSON string
milanmlft Dec 20, 2023
a81e285
Assert messages against list of `Message`s
milanmlft Dec 20, 2023
8583ca0
Print dataclass in logs
milanmlft Dec 20, 2023
7c32e8d
`jsonpickle.decode()` can handle bytes so no need to decode first
milanmlft Dec 20, 2023
ae90838
Make `deserialisable` a keyword only argument
milanmlft Dec 20, 2023
2c49dc3
Copilot forgot to convert dates to datetimes 🥲
milanmlft Dec 20, 2023
db1e34d
Refactor PixlConsumer run method to accept Message object as callback…
milanmlft Dec 20, 2023
7343fcd
Update consumer in `test_subscriber` to accept Message object instead…
milanmlft Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions cli/src/pixl_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import pandas as pd
import requests
import yaml
from core.patient_queue.message import Message
from core.patient_queue.producer import PixlProducer
from core.patient_queue.subscriber import PixlBlockingConsumer
from core.patient_queue.utils import deserialise, serialise

from ._logging import logger, set_log_level
from ._utils import clear_file, remove_file_if_it_exists, string_is_non_empty
Expand Down Expand Up @@ -345,9 +345,6 @@ def messages_from_parquet(dir_path: Path) -> Messages:
f"{expected_col_names}"
)

# First line is column names
messages = Messages()

for col in expected_col_names:
if col not in list(cohort_data.columns):
msg = f"csv file expected to have at least {expected_col_names} as " f"column names"
Expand All @@ -367,17 +364,20 @@ def messages_from_parquet(dir_path: Path) -> Messages:
project_name = logs["settings"]["cdm_source_name"]
omop_es_timestamp = datetime.datetime.fromisoformat(logs["datetime"])

messages = Messages()

for _, row in cohort_data.iterrows():
messages.append(
serialise(
mrn=row[mrn_col_name],
accession_number=row[acc_num_col_name],
study_datetime=row[dt_col_name],
procedure_occurrence_id=row[procedure_occurrence_id],
project_name=project_name,
omop_es_timestamp=omop_es_timestamp,
)
)
# Create new dict to initialise message
message_fields = {
"mrn": row[mrn_col_name],
"accession_number": row[acc_num_col_name],
"study_datetime": row[dt_col_name],
"procedure_occurrence_id": row[procedure_occurrence_id],
"project_name": project_name,
"omop_es_timestamp": omop_es_timestamp,
}
message = Message(message_fields)
messages.append(message.serialise())

if len(messages) == 0:
msg = f"Failed to find any messages in {dir_path}"
Expand Down Expand Up @@ -448,9 +448,9 @@ def api_config_for_queue(queue_name: str) -> APIConfig:
return APIConfig(config[config_key])


def study_date_from_serialised(message: bytes) -> datetime.datetime:
def study_date_from_serialised(message: Message) -> datetime.datetime:
"""Get the study date from a serialised message as a datetime"""
result = deserialise(message)["study_datetime"]
result = message.deserialise()["study_datetime"]
if not isinstance(result, datetime.datetime):
msg = "Expected study date to be a datetime. Got %s"
raise TypeError(msg, type(result))
Expand Down
97 changes: 97 additions & 0 deletions pixl_core/src/core/patient_queue/message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Copyright (c) 2022 University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Classes to represent messages in the patient queue."""

import json
import logging
from datetime import datetime

logger = logging.getLogger(__name__)


class SerialisedMessage:
"""Class to represent a serialised message."""

body: bytes

def __init__(self, body: str) -> None:
"""Initialise the serialised message from JSON dump."""
self.body = body.encode("utf-8")

def deserialise(self) -> dict:
"""Returns the de-serialised message in JSON format."""
logger.debug("De-serialising: %s", self.decode())
data = dict(json.loads(self.decode()))
if "study_datetime" in data:
data["study_datetime"] = datetime.fromisoformat(data["study_datetime"])
if "omop_es_timestamp" in data:
data["omop_es_timestamp"] = datetime.fromisoformat(data["omop_es_timestamp"])

return data

def decode(self) -> str:
"""Returns the serialised message in string format."""
return self.body.decode()


class Message:
"""Class to represent a message containing the relevant information for a study."""

mrn: str
accession_number: str
study_datetime: datetime
procedure_occurrence_id: str
project_name: str
omop_es_timestamp: datetime

def __init__(self, message_fields: dict) -> None:
"""Initialise the message."""
self.mrn = message_fields["mrn"]
self.accession_number = message_fields["accession_number"]
self.study_datetime = message_fields["study_datetime"]
self.procedure_occurrence_id = message_fields["procedure_occurrence_id"]
self.project_name = message_fields["project_name"]
self.omop_es_timestamp = message_fields["omop_es_timestamp"]

def serialise(self) -> "SerialisedMessage":
"""Serialise the message into JSON format."""
msg = (
"Serialising message with\n"
" * patient id: %s\n"
" * accession number: %s\n"
" * timestamp: %s\n"
" * procedure_occurrence_id: %s\n",
" * project_name: %s\n * omop_es_timestamp: %s",
self.mrn,
self.accession_number,
self.study_datetime,
self.procedure_occurrence_id,
self.project_name,
self.omop_es_timestamp,
)
logger.debug(msg)

body = json.dumps(
{
"mrn": self.mrn,
"accession_number": self.accession_number,
"study_datetime": self.study_datetime.isoformat(),
"procedure_occurrence_id": self.procedure_occurrence_id,
"project_name": self.project_name,
"omop_es_timestamp": self.omop_es_timestamp.isoformat(),
}
)

return SerialisedMessage(body=body)
70 changes: 0 additions & 70 deletions pixl_core/src/core/patient_queue/utils.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,26 @@
import datetime
import json

from core.patient_queue.utils import deserialise, serialise
from core.patient_queue.message import Message, SerialisedMessage


def test_serialise() -> None:
"""Checks that messages can be correctly serialised"""
msg_body = serialise(
mrn="111",
accession_number="123",
study_datetime=datetime.datetime.strptime("Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p").replace(
tzinfo=datetime.timezone.utc
),
procedure_occurrence_id="234",
project_name="test project",
omop_es_timestamp=datetime.datetime.strptime(
"Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p"
).replace(tzinfo=datetime.timezone.utc),
msg = Message(
{
"mrn": "111",
"accession_number": "123",
"study_datetime": datetime.datetime.strptime(
"Nov 22 2022 1:33PM", "%b %d %Y %I:%M%p"
).replace(tzinfo=datetime.timezone.utc),
"procedure_occurrence_id": "234",
"project_name": "test project",
"omop_es_timestamp": datetime.datetime.strptime(
"Dec 7 2023 2:08PM", "%b %d %Y %I:%M%p"
).replace(tzinfo=datetime.timezone.utc),
}
)
msg_body = msg.serialise()
assert (
msg_body.decode() == '{"mrn": "111", "accession_number": "123", '
'"study_datetime": "2022-11-22T13:33:00+00:00", '
Expand All @@ -42,20 +45,23 @@ def test_serialise() -> None:

def test_simple_deserialise() -> None:
"""Checks a simple JSON deserialise works"""
assert deserialise((json.dumps({"key": "value"})).encode("utf-8"))["key"] == "value"
serialised_msg = SerialisedMessage(json.dumps({"key": "value"}))
assert serialised_msg.deserialise()["key"] == "value"


def test_deserialise_datetime() -> None:
"""Checks that datetimes can be correctly serialised"""
timestamp = datetime.datetime.fromordinal(100012)
data = deserialise(
serialise(
mrn="",
accession_number="",
study_datetime=timestamp,
procedure_occurrence_id="",
project_name="",
omop_es_timestamp=datetime.datetime.now(), # noqa: DTZ005
)
msg = Message(
{
"mrn": "",
"accession_number": "",
"study_datetime": timestamp,
"procedure_occurrence_id": "",
"project_name": "",
"omop_es_timestamp": datetime.datetime.now(), # noqa: DTZ005
}
)
serialised_msg = msg.serialise()
data = serialised_msg.deserialise()
assert data["study_datetime"] == timestamp
12 changes: 6 additions & 6 deletions pixl_ehr/src/pixl_ehr/_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import Optional

import requests
from core.patient_queue.utils import deserialise
from core.patient_queue.message import SerialisedMessage
from decouple import config

from pixl_ehr._databases import EMAPStar, PIXLDatabase
Expand All @@ -35,10 +35,10 @@
_this_dir = Path(Path(__file__).parent)


async def process_message(message_body: bytes) -> None:
logger.info("Processing: %s", message_body.decode())
async def process_message(serialised_message: SerialisedMessage) -> None:
logger.info("Processing: %s", serialised_message.decode())

raw_data = PatientEHRData.from_message(message_body)
raw_data = PatientEHRData.from_message(serialised_message)
pixl_db = PIXLDatabase()

if pixl_db.contains(raw_data):
Expand Down Expand Up @@ -79,12 +79,12 @@ class PatientEHRData:
report_text: Optional[str] = None

@classmethod
def from_message(cls, message_body: bytes) -> "PatientEHRData":
def from_message(cls, serialised_message: SerialisedMessage) -> "PatientEHRData":
"""
Create a minimal set of patient EHR data required to start queries from a
queue message
"""
message_data = deserialise(message_body)
message_data = serialised_message.deserialise()
self = PatientEHRData(
mrn=message_data["mrn"],
accession_number=message_data["accession_number"],
Expand Down
25 changes: 14 additions & 11 deletions pixl_ehr/tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import datetime

import pytest
from core.patient_queue.utils import serialise
from core.patient_queue.message import Message
from decouple import config
from pixl_ehr._databases import PIXLDatabase, WriteableDatabase
from pixl_ehr._processing import process_message
Expand Down Expand Up @@ -55,16 +55,19 @@
weight_vot_id, height_vot_id, gcs_vot_id = 2222222, 3333333, 4444444
ls_id, lo_id, lr_id, ltd_id = 5555555, 6666666, 7777777, 8888888

message_body = serialise(
mrn=mrn,
accession_number=accession_number,
study_datetime=datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace(
tzinfo=datetime.timezone.utc
),
procedure_occurrence_id=procedure_occurrence_id,
project_name=project_name,
omop_es_timestamp=omop_es_timestamp,
message = Message(
{
"mrn": mrn,
"accession_number": accession_number,
"study_datetime": datetime.datetime.strptime(study_datetime_str, "%d/%m/%Y %H:%M").replace(
tzinfo=datetime.timezone.utc
),
"procedure_occurrence_id": procedure_occurrence_id,
"project_name": project_name,
"omop_es_timestamp": omop_es_timestamp,
}
)
serialised_message = message.serialise()


class WritableEMAPStar(WriteableDatabase):
Expand Down Expand Up @@ -163,7 +166,7 @@ def insert_data_into_emap_star_schema() -> None:
@pytest.mark.asyncio()
async def test_message_processing() -> None:
insert_data_into_emap_star_schema()
await process_message(message_body)
await process_message(serialised_message)

pixl_db = QueryablePIXLDB()
row = pixl_db.execute_query_string("select * from emap_data.ehr_raw where mrn = %s", [mrn])
Expand Down
Loading
Loading