diff --git a/.gitignore b/.gitignore index 7fc2eb883..50871c91b 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,10 @@ hs_err_pid* # IntelliJ files .idea/ *.iml + +# python +__pycache__ + +# prevent notebooks from being checked in +*.ipynb +.ipynb_checkpoints diff --git a/cassandra-config-envs.EXAMPLE b/cassandra-config-envs.EXAMPLE index e3d4249b6..80496efd6 100644 --- a/cassandra-config-envs.EXAMPLE +++ b/cassandra-config-envs.EXAMPLE @@ -1,3 +1,3 @@ -# these settings work on the GAE but if the process is getting OOM killed you can reduce them +# defaults work on the GAE but if the process is getting OOM killed you can reduce them MAX_HEAP_SIZE=4G HEAP_NEWSIZE=800M diff --git a/core/Dockerfile.emap-portal b/core/Dockerfile.emap-portal new file mode 100644 index 000000000..0beed03ea --- /dev/null +++ b/core/Dockerfile.emap-portal @@ -0,0 +1,10 @@ +FROM nginx:otel +RUN apt update && \ + apt install -y apache2-utils && \ + apt clean +COPY core/emap-portal/nginx.conf /etc/nginx/ +COPY core/emap-portal/conf.d/ /etc/nginx/conf.d/ +COPY core/emap-portal/www/* /usr/share/nginx/html/ +RUN --mount=type=secret,id=portal-build-secrets \ + . /run/secrets/portal-build-secrets && \ + htpasswd -b -B -c /etc/nginx/conf.d/htpasswd "$PORTAL_USERNAME" "$PORTAL_PASSWORD" diff --git a/core/core-config-envs.EXAMPLE b/core/core-config-envs.EXAMPLE index 51dfcb88b..f248f737d 100644 --- a/core/core-config-envs.EXAMPLE +++ b/core/core-config-envs.EXAMPLE @@ -15,5 +15,4 @@ SPRING_RABBITMQ_USERNAME=emap SPRING_RABBITMQ_PASSWORD=yourstrongpassword LOGGING_LEVEL_UK_AC_UCL_RITS_INFORM=INFO CORE_WAVEFORM_RETENTION_HOURS=24 -CORE_WAVEFORM_IS_NON_CURRENT_TEST_DATA=true TZ=Europe/London diff --git a/core/docker-compose.yml b/core/docker-compose.yml index 43346ba37..d86fa564e 100644 --- a/core/docker-compose.yml +++ b/core/docker-compose.yml @@ -54,4 +54,25 @@ services: restart: on-failure depends_on: - cassandra + emap-portal: + build: + context: .. + dockerfile: core/Dockerfile.emap-portal + args: + HTTP_PROXY: ${HTTP_PROXY} + http_proxy: ${http_proxy} + HTTPS_PROXY: ${HTTPS_PROXY} + https_proxy: ${https_proxy} + secrets: + - portal-build-secrets + env_file: + - ../../config/portal-config-envs + ports: + - "${PORTAL_PORT}:80" + + + +secrets: + portal-build-secrets: + file: ../../config/portal-config-envs diff --git a/core/emap-portal/conf.d/default.conf b/core/emap-portal/conf.d/default.conf new file mode 100644 index 000000000..145b9301f --- /dev/null +++ b/core/emap-portal/conf.d/default.conf @@ -0,0 +1,43 @@ +server { + listen 80; + listen [::]:80; + server_name localhost; + # nginx is behind docker, so the browser is using a different port number which nginx doesn't know about. + # Use relative redirects to avoid redirecting to port 80. (301s are used when trailing slashes are omitted) + absolute_redirect off; + auth_basic "Administrator’s Area"; + auth_basic_user_file conf.d/htpasswd; + + + access_log /var/log/nginx/host.access.log main; + + #error_page 404 /404.html; + + # redirect server error pages to the static page /50x.html + # + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root /usr/share/nginx/html; + } + + location / { + root /usr/share/nginx/html; + include conf.d/shared/shared_location_config.conf; + } + + location /glowroot/ { + include conf.d/shared/shared_location_config.conf; + proxy_pass http://glowroot-central:4000/; + sub_filter 'href="/' 'href="/glowroot/'; + sub_filter 'src="/' 'src="/glowroot/'; + } + + location /streamlit/ { + include conf.d/shared/shared_location_config.conf; + proxy_pass http://streamlit:8501/streamlit/; + proxy_buffering off; + + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + } +} diff --git a/core/emap-portal/conf.d/shared/shared_location_config.conf b/core/emap-portal/conf.d/shared/shared_location_config.conf new file mode 100644 index 000000000..287a632e8 --- /dev/null +++ b/core/emap-portal/conf.d/shared/shared_location_config.conf @@ -0,0 +1,11 @@ +sub_filter_once off; +proxy_redirect off; +proxy_set_header Host $host; +proxy_set_header X-Real-IP $remote_addr; +proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; +proxy_set_header X-Forwarded-Proto $scheme; +proxy_set_header Accept-Encoding ""; # turn off gzip for upstream so rewriting can work +# needed for websockets +proxy_http_version 1.1; +proxy_read_timeout 86400; +proxy_send_timeout 3600; diff --git a/core/emap-portal/nginx.conf b/core/emap-portal/nginx.conf new file mode 100644 index 000000000..a0a893518 --- /dev/null +++ b/core/emap-portal/nginx.conf @@ -0,0 +1,50 @@ + +user nginx; +worker_processes auto; + +error_log /var/log/nginx/error.log debug; +pid /var/run/nginx.pid; + + +events { + worker_connections 1024; +} + + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + access_log /var/log/nginx/access.log main; + + sendfile on; + #tcp_nopush on; + + keepalive_timeout 65; + + #gzip on; + + include /etc/nginx/conf.d/*.conf; +} + +# pure TCP proxy? +# +# stream { +# upstream backend { +# server backend-server:12345; +# } +# +# server { +# listen 12345; +# proxy_pass backend; +# Allow specific IP addresses +# allow 192.168.1.1; # Replace with the allowed IP address +# allow 192.168.1.2; # Add more allowed IP addresses as needed +# deny all; # Deny all other IP addresses + +# } +# } \ No newline at end of file diff --git a/core/emap-portal/www/index.html b/core/emap-portal/www/index.html new file mode 100644 index 000000000..4c9beec5a --- /dev/null +++ b/core/emap-portal/www/index.html @@ -0,0 +1,17 @@ + + + + + Emap admin page + + +You can access various Emap admin/visualisation/monitoring services: + + + + + diff --git a/core/src/main/java/uk/ac/ucl/rits/inform/datasinks/emapstar/dataprocessors/WaveformProcessor.java b/core/src/main/java/uk/ac/ucl/rits/inform/datasinks/emapstar/dataprocessors/WaveformProcessor.java index 1592a6b59..f3a92de17 100644 --- a/core/src/main/java/uk/ac/ucl/rits/inform/datasinks/emapstar/dataprocessors/WaveformProcessor.java +++ b/core/src/main/java/uk/ac/ucl/rits/inform/datasinks/emapstar/dataprocessors/WaveformProcessor.java @@ -27,8 +27,6 @@ public class WaveformProcessor { @Value("${core.waveform.retention_hours}") private int retentionTimeHours; - @Value("${core.waveform.is_non_current_test_data}") - private boolean isNonCurrentTestData; /** * @param visitObservationController visit observation controller @@ -59,25 +57,37 @@ public void processMessage(final WaveformMessage msg, final Instant storedFrom) */ @Scheduled(fixedRate = 60 * 60 * 1000) public void deleteOldWaveformData() { - logger.info("deleteOldWaveformData: Checking for old waveform data for deletion"); - Instant baselineDatetime; - if (isNonCurrentTestData) { - // while testing, use the current data (which may be for a - // date far from the present) as a reference for when to apply retention cutoff date from. - // ie. assume the time of the most recent data is "now" - baselineDatetime = waveformController.mostRecentObservationDatatime(); - if (baselineDatetime == null) { - logger.info("deleteOldWaveformData: nothing in DB, do nothing"); - return; - } + /* When calculating the retention cutoff datetime, instead of working back from the current datetime, + * start at the datetime of the most recent piece of waveform data. + * The main purpose of this is that when testing (eg. using a dump file that might be quite old), + * you don't want to immediately delete all the data due to its timestamps being way in the past. + * And in production the most recent piece of data will be very close to the present time anyway, + * so keep things simple and use the same logic in both cases. + */ + Instant baselineDatetime = waveformController.mostRecentObservationDatatime(); + if (baselineDatetime == null) { + logger.info("deleteOldWaveformData: nothing in DB, do nothing"); + return; + } + + Instant now = Instant.now(); + if (baselineDatetime.isAfter(now)) { + // In the hopefully unlikely case that the incoming data is in the future, don't + // go and delete all our data! + logger.warn("deleteOldWaveformData: most recent data is in the future ({}), using current time instead", + baselineDatetime); + baselineDatetime = now; + } - } else { - baselineDatetime = Instant.now(); + if (retentionTimeHours <= 0) { + logger.info("deleteOldWaveformData: retention time is infinite, do nothing (baseline date = {})", + baselineDatetime); + return; } Instant cutoff = baselineDatetime.minus(retentionTimeHours, ChronoUnit.HOURS); - logger.info("deleteOldWaveformData: baseline = {}, cutoff = {}", baselineDatetime, cutoff); + logger.info("deleteOldWaveformData: deleting, baseline date = {}, cutoff = {}", baselineDatetime, cutoff); int numDeleted = waveformController.deleteOldWaveformData(cutoff); - logger.info("deleteOldWaveformData: Old waveform data deletion: {} rows older than {}", numDeleted, cutoff); + logger.info("deleteOldWaveformData: deleted {} rows older than {}", numDeleted, cutoff); } } diff --git a/core/src/main/resources/application.properties b/core/src/main/resources/application.properties index 9e87ab9ac..c54db3790 100644 --- a/core/src/main/resources/application.properties +++ b/core/src/main/resources/application.properties @@ -21,6 +21,5 @@ core.rabbitmq.listen_queues = hl7Queue,databaseExtracts,extensionProjects,wavefo # Data older than this is liable to be deleted to keep overall disk usage small. # In production we will want to have this longer (more like 7 days) core.waveform.retention_hours = 1 -core.waveform.is_non_current_test_data = 0 spring.rabbitmq.listener.simple.acknowledge-mode=manual diff --git a/docs/dev/features/waveform_hf_data.md b/docs/dev/features/waveform_hf_data.md index feb23fcf1..7c5913a2e 100644 --- a/docs/dev/features/waveform_hf_data.md +++ b/docs/dev/features/waveform_hf_data.md @@ -28,11 +28,8 @@ but in practice it's typically 20 seconds. We have aimed for similar. ## Config options added Core: - - `core.waveform.retention_hours` periodically delete data more than this many hours old - - `core.waveform.is_non_current_test_data` for testing only - when deciding which data to delete/retain, if set to true, - then treat the "now" point as the most recent observation date in the waveform table, rather than the actual - current time. Purpose is to avoid test data getting immediately deleted because it's too old, which could happen - if we have a fixed set of test data with observation dates way in the past. + - `core.waveform.retention_hours` periodically delete data that is more than this many hours older than + the newest piece of waveform data. Waveform Generator: - `waveform.hl7.send_host`, `waveform.hl7.send_port` - the host and port to send the generated data to @@ -53,8 +50,8 @@ Waveform Reader: ## Container housekeeping (setup script) The waveform processing feature is enabled or disabled in the global configuration file. I've added -a "features" section for this, and taken the opportunity to also add the `fakeuds` container to make that easier -to turn on and off. +a "waveform" section for these and related settings. +I also added the `fake_uds` section for turning that on and off. Because the waveform feature flag will include/exclude the relevant docker compose files from the docker commands it generates, you can continue to diff --git a/emap-setup/emap_runner/docker/docker_runner.py b/emap-setup/emap_runner/docker/docker_runner.py index 9c9c3ac43..6c231867a 100644 --- a/emap-setup/emap_runner/docker/docker_runner.py +++ b/emap-setup/emap_runner/docker/docker_runner.py @@ -34,9 +34,9 @@ def __init__(self, self.project_dir = project_dir self.emap_dir = project_dir / "emap" self.config = config - self.enable_waveform = first_not_none(enable_waveform, self.config.get("features", "waveform")) - self.use_fake_waveform = first_not_none(use_fake_waveform, self.config.get("features", "waveform_generator")) - self.use_fake_uds = first_not_none(use_fake_uds, self.config.get("features", "fake_uds")) + self.enable_waveform = first_not_none(enable_waveform, self.config.get("waveform", "enable_waveform")) + self.use_fake_waveform = first_not_none(use_fake_waveform, self.config.get("waveform", "enable_waveform_generator")) + self.use_fake_uds = first_not_none(use_fake_uds, self.config.get("fake_uds", "enable_fake_uds")) def run( self, @@ -108,6 +108,8 @@ def docker_compose_paths(self) -> List[Path]: paths.append(Path(self.emap_dir, "waveform-reader", "docker-compose.yml")) if self.use_fake_waveform: paths.append(Path(self.emap_dir, "waveform-generator", "docker-compose.yml")) + if self.config.get("monitoring", "use_streamlit"): + paths.append(Path(self.emap_dir, "monitoring", "docker-compose.yml")) # allow for hoover and to be optional compose path if "hoover" in self.config["repositories"]: diff --git a/emap-setup/emap_runner/global_config.py b/emap-setup/emap_runner/global_config.py index 74eb1bc73..9f323c18a 100644 --- a/emap-setup/emap_runner/global_config.py +++ b/emap-setup/emap_runner/global_config.py @@ -21,7 +21,9 @@ class GlobalConfiguration(dict): "global", "glowroot", "common", - "features", + "fake_uds", + "monitoring", + "waveform" ) def __init__(self, filepath: Path): @@ -136,6 +138,10 @@ def _substitute_vars(self, env_file: "EnvironmentFile") -> None: try: value = self.get_first(key, env_file.basename) + if value is None: + # Don't stringify None, Spring won't understand. + # Empty string is the closest alternative. + value = "" env_file.set_new_line_at(f"{key}={value}\n", idx=i) except KeyError: diff --git a/emap-setup/global-configuration-EXAMPLE.yaml b/emap-setup/global-configuration-EXAMPLE.yaml index 4de0f07b7..ce6423920 100644 --- a/emap-setup/global-configuration-EXAMPLE.yaml +++ b/emap-setup/global-configuration-EXAMPLE.yaml @@ -30,11 +30,18 @@ repositories: # hoover: # branch: develop -# Feature flags for not quite ready features, or for turning fake services on and off -features: - waveform: false - waveform_generator: false - fake_uds: false +# Each section below could represent either a service defined by Emap (eg. rabbitmq), +# an external service (eg. IDS/UDS), or really anything at all. The meaning is ultimately +# defined by the setup script. +# To pull a variable into the environment for a container, create/edit a file `*-config-envs.EXAMPLE` +# containing the (empty) variables you wish to have populated. The file (minus ".EXAMPLE" suffix) +# will be copied into the config dir with real values when you run `emap setup -g`. This config file +# should be referenced from the relevant docker-compose service definition to bring the envs into the +# container. +# Note that the sections are not namespaces, and thus variable names should be unique +# even if in a different section. +# By convention, variables intended to be passed into containers directly are in +# upper case. Variables to control the setup script itself are lower case. # Configuration data for the rabbitmq instance used by Spring in the pipeline rabbitmq: @@ -87,3 +94,26 @@ glowroot: GLOWROOT_PASSWORD: glowrootpw GLOWROOT_ADMIN_PORT: 4000 +# For testing outside the GAE, you can enable a fake UDS +fake_uds: + enable_fake_uds: false + +# The nginx portal and other monitoring/validation/visualisation services +monitoring: + SERVER_EXTERNAL_HOSTNAME: server.fqdn.example + PORTAL_PORT: 7100 + PORTAL_USERNAME: emap + PORTAL_PASSWORD: portal_password + use_streamlit: false + +# config related to waveform data ingress +waveform: + enable_waveform: false + enable_waveform_generator: false + CORE_WAVEFORM_RETENTION_HOURS: 24 + WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST: 127.0.0.1 + WAVEFORM_HL7_TEST_DUMP_FILE: "" + WAVEFORM_SYNTHETIC_NUM_PATIENTS: 30 + WAVEFORM_SYNTHETIC_WARP_FACTOR: 6 + WAVEFORM_SYNTHETIC_START_DATETIME: "2024-01-02T12:00:00Z" + WAVEFORM_SYNTHETIC_END_DATETIME: "2024-01-03T12:00:00Z" diff --git a/emap-setup/tests/data/test-global-configuration-only-docs.yaml b/emap-setup/tests/data/test-global-configuration-only-docs.yaml index f6848e3e4..e6b25f25e 100644 --- a/emap-setup/tests/data/test-global-configuration-only-docs.yaml +++ b/emap-setup/tests/data/test-global-configuration-only-docs.yaml @@ -8,3 +8,12 @@ EMAP_PROJECT_NAME: repositories: emap_documentation: branch: main +# For testing outside the GAE, you can enable a fake UDS +fake_uds: + enable_fake_uds: false +# config related to waveform data ingress +waveform: + enable_waveform: false + enable_waveform_generator: false + WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST: 127.0.0.1 + WAVEFORM_HL7_TEST_DUMP_FILE: "" diff --git a/emap-setup/tests/data/test-global-configuration-onlyhl7.yaml b/emap-setup/tests/data/test-global-configuration-onlyhl7.yaml index 56b7f4204..68c57c375 100644 --- a/emap-setup/tests/data/test-global-configuration-onlyhl7.yaml +++ b/emap-setup/tests/data/test-global-configuration-onlyhl7.yaml @@ -38,3 +38,14 @@ core: UDS_SCHEMA: inform_schema UDS_USERNAME: someuser UDS_PASSWORD: redacted + +# For testing outside the GAE, you can enable a fake UDS +fake_uds: + enable_fake_uds: false + +# config related to waveform data ingress +waveform: + enable_waveform: false + enable_waveform_generator: false + WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST: 127.0.0.1 + WAVEFORM_HL7_TEST_DUMP_FILE: "" diff --git a/emap-setup/tests/data/test-global-configuration.yaml b/emap-setup/tests/data/test-global-configuration.yaml index 2c250f9b6..158158b53 100644 --- a/emap-setup/tests/data/test-global-configuration.yaml +++ b/emap-setup/tests/data/test-global-configuration.yaml @@ -59,3 +59,14 @@ informdb: UDS_SCHEMA: inform_schema_for_branch UDS_USERNAME: someuseraaa UDS_PASSWORD: redactedaaa + +# For testing outside the GAE, you can enable a fake UDS +fake_uds: + enable_fake_uds: false + +# config related to waveform data ingress +waveform: + enable_waveform: false + enable_waveform_generator: false + WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST: 127.0.0.1 + WAVEFORM_HL7_TEST_DUMP_FILE: "" diff --git a/emap-star/emap-star/src/main/java/uk/ac/ucl/rits/inform/informdb/visit_recordings/Waveform.java b/emap-star/emap-star/src/main/java/uk/ac/ucl/rits/inform/informdb/visit_recordings/Waveform.java index 4a0730948..c27f9fe2c 100644 --- a/emap-star/emap-star/src/main/java/uk/ac/ucl/rits/inform/informdb/visit_recordings/Waveform.java +++ b/emap-star/emap-star/src/main/java/uk/ac/ucl/rits/inform/informdb/visit_recordings/Waveform.java @@ -34,6 +34,7 @@ @Index(name = "waveform_datetime", columnList = "observationDatetime"), @Index(name = "waveform_location", columnList = "sourceLocation"), @Index(name = "waveform_location_visit", columnList = "locationVisitId"), + @Index(name = "waveform_observation_type", columnList = "visitObservationTypeId"), }) @Data @EqualsAndHashCode(callSuper = true) diff --git a/global-config-envs.EXAMPLE b/global-config-envs.EXAMPLE index e96471e55..ece509d24 100644 --- a/global-config-envs.EXAMPLE +++ b/global-config-envs.EXAMPLE @@ -4,3 +4,4 @@ RABBITMQ_ADMIN_PORT=5674 GLOWROOT_ADMIN_PORT=4000 FAKEUDS_PORT=5433 HL7_READER_PORT=9999 +PORTAL_PORT= diff --git a/monitoring/docker-compose.yml b/monitoring/docker-compose.yml new file mode 100644 index 000000000..bce28a089 --- /dev/null +++ b/monitoring/docker-compose.yml @@ -0,0 +1,15 @@ +services: + streamlit: + build: + context: .. + dockerfile: monitoring/streamlit/Dockerfile + args: + HTTP_PROXY: ${HTTP_PROXY} + http_proxy: ${http_proxy} + HTTPS_PROXY: ${HTTPS_PROXY} + https_proxy: ${https_proxy} + env_file: + - ../../config/streamlit-config-envs + logging: + driver: "json-file" + restart: "no" diff --git a/monitoring/requirements.txt b/monitoring/requirements.txt new file mode 100644 index 000000000..f83b25db6 --- /dev/null +++ b/monitoring/requirements.txt @@ -0,0 +1,11 @@ +jupyter +jupyterlab +jupytext +matplotlib +pandas +psycopg2-binary +pytest +scipy +soundfile +sqlalchemy +streamlit diff --git a/monitoring/streamlit-config-envs.EXAMPLE b/monitoring/streamlit-config-envs.EXAMPLE new file mode 100644 index 000000000..236eceb1f --- /dev/null +++ b/monitoring/streamlit-config-envs.EXAMPLE @@ -0,0 +1,6 @@ +UDS_JDBC_URL= +UDS_SCHEMA= +UDS_USERNAME= +UDS_PASSWORD= +SERVER_EXTERNAL_HOSTNAME= +PORTAL_PORT= diff --git a/monitoring/streamlit/Dockerfile b/monitoring/streamlit/Dockerfile new file mode 100644 index 000000000..4d54e6e72 --- /dev/null +++ b/monitoring/streamlit/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.12-slim-bullseye +WORKDIR /app/streamlit +COPY monitoring/requirements.txt /app/streamlit +RUN pip install -r requirements.txt +COPY monitoring/streamlit/ /app/streamlit +CMD streamlit run \ + --browser.gatherUsageStats=false \ + --server.enableWebsocketCompression=false \ + --server.enableXsrfProtection=false \ + # base URL to match where the proxy expects it to be - simpler than URL rewriting in the proxy + --server.baseUrlPath "streamlit" \ + # Without this, websocket calls don't work behind nginx + --browser.serverAddress ${SERVER_EXTERNAL_HOSTNAME} \ + --browser.serverPort ${PORTAL_PORT} \ + st_home.py diff --git a/monitoring/streamlit/__init__.py b/monitoring/streamlit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/monitoring/streamlit/database_utils.py b/monitoring/streamlit/database_utils.py new file mode 100644 index 000000000..b1ce394e1 --- /dev/null +++ b/monitoring/streamlit/database_utils.py @@ -0,0 +1,111 @@ +import math +import os +from datetime import timedelta + +import pandas as pd +import sqlalchemy +from sqlalchemy.engine.url import make_url +import streamlit as st +import psycopg2 + +# Perhaps we should move away from making the JDBC url primary, but +# for now we will have to accept this and make some edits so we can +# use it here. +database_jdbc_url = os.environ['UDS_JDBC_URL'] +database_user = os.environ['UDS_USERNAME'] +database_password = os.environ['UDS_PASSWORD'] +database_schema = os.environ['UDS_SCHEMA'] +database_url = make_url(database_jdbc_url.replace("jdbc:", "")) +# host, database, and port will be correct, but change the driver and user/pass +database_url = database_url.set(drivername='postgresql+psycopg2', username=database_user, password=database_password) + +SET_SEARCH_PATH = f"set search_path to {database_schema};" +engine = sqlalchemy.create_engine(database_url) + + +@st.cache_data(ttl=60) +def get_all_params(): + with engine.connect() as con: + return pd.read_sql_query(SET_SEARCH_PATH + + """ + SELECT DISTINCT + w.visit_observation_type_id, + w.source_location, + vot.name + FROM WAVEFORM w + INNER JOIN VISIT_OBSERVATION_TYPE vot + ON vot.visit_observation_type_id = w.visit_observation_type_id + """, con) + + +@st.cache_data(ttl=60) +def get_min_max_time_for_single_stream(visit_observation_type_id, source_location): + params = (visit_observation_type_id, source_location) + query = SET_SEARCH_PATH + """ + SELECT min(observation_datetime) as min_time, max(observation_datetime) as max_time + FROM WAVEFORM + WHERE visit_observation_type_id = %s AND source_location = %s + """ + with engine.connect() as con: + minmax = pd.read_sql_query(query, con, params=params) + if minmax.empty: + return None, None + else: + return minmax.iloc[0].min_time, minmax.iloc[0].max_time + + +def get_data_single_stream_rounded(visit_observation_type_id, source_location, graph_start_time, graph_end_time, max_time, max_row_length_seconds=30): + # Because a row's observation_datetime is the time of the *first* data point in the array, + # to get the data starting at time T, you have to query the DB for data a little earlier than T. + # Additionally, to aid caching, round down further so repeated calls with + # approximately similar values of min_time will result in exactly the + # same query being issued (which is hopefully already cached) + actual_min_time = graph_start_time - timedelta(seconds=max_row_length_seconds) + rounded_seconds = actual_min_time.second // 10 * 10 + rounded_min_time = actual_min_time.replace(second=rounded_seconds, microsecond=0) + # For the same reason, round the max value up to the nearest few seconds (5 is pretty arbitrary) + # (using +timedelta instead of replacing seconds value because you might hit 60 and have to wrap around) + # However, do not ask for data beyond what we know exists (max_time). We don't want + # the incomplete response to get cached. + rounded_max_time = (graph_end_time.replace(second=0, microsecond=0) + + timedelta(seconds=math.ceil((graph_end_time.second + graph_end_time.microsecond/1_000_000) / 5) * 5)) + capped_at_max = False + if rounded_max_time > max_time: + capped_at_max = True + rounded_max_time = max_time + print(f"Adjusted min time {graph_start_time} -> {rounded_min_time}") + print(f"Adjusted max time {graph_end_time} -> {rounded_max_time} {'(capped)' if capped_at_max else ''}") + return get_data_single_stream(visit_observation_type_id, source_location, rounded_min_time, rounded_max_time) + + +@st.cache_data(ttl=1800) +def get_data_single_stream(visit_observation_type_id, source_location, min_time, max_time): + params = (visit_observation_type_id, source_location, min_time, max_time) + # Index(['waveform_id', 'stored_from', 'valid_from', 'observation_datetime', + # 'sampling_rate', 'source_location', 'unit', 'values_array', + # 'location_visit_id', 'visit_observation_type_id'], + # dtype='object') + # It's much quicker to do the array unpacking and date calculation here rather than in pandas later. + # This will still need a trim because the way the SQL arrays work you get more data than you need. + query = SET_SEARCH_PATH + """ + SELECT + w.waveform_id, + w.observation_datetime AS base_observation_datetime, + w.observation_datetime + make_interval(secs => (v.ordinality - 1)::float / w.sampling_rate) AS observation_datetime, + v.v as waveform_value, + v.ordinality, + w.sampling_rate, + w.source_location, + w.unit, + w.location_visit_id, + w.visit_observation_type_id + FROM WAVEFORM w, unnest(w.values_array) WITH ORDINALITY v + WHERE visit_observation_type_id = %s AND source_location = %s + AND observation_datetime >= %s + AND observation_datetime <= %s + ORDER BY observation_datetime + """ + # print(f"qry = {query}, params = {params}") + with engine.connect() as con: + data = pd.read_sql_query(query, con, params=params) + return data diff --git a/monitoring/streamlit/gaps.sql b/monitoring/streamlit/gaps.sql new file mode 100644 index 000000000..a67924cd9 --- /dev/null +++ b/monitoring/streamlit/gaps.sql @@ -0,0 +1,7 @@ +SELECT * +FROM waveform +WHERE + visit_observation_type_id = %s + AND source_location = %s +ORDER BY observation_datetime +; diff --git a/monitoring/streamlit/jupytext.toml b/monitoring/streamlit/jupytext.toml new file mode 100644 index 000000000..ee448177c --- /dev/null +++ b/monitoring/streamlit/jupytext.toml @@ -0,0 +1,4 @@ +# Every notebook in this folder should be paired with the Python percent format + +formats = "ipynb,py:percent" +notebook_metadata_filter = "-jupytext.text_representation.jupytext_version,-kernelspec" diff --git a/monitoring/streamlit/presentation.py b/monitoring/streamlit/presentation.py new file mode 100644 index 000000000..8d46b09b5 --- /dev/null +++ b/monitoring/streamlit/presentation.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# --- +# jupyter: +# jupytext: +# notebook_metadata_filter: -jupytext.text_representation.jupytext_version,-kernelspec +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# --- + +# %% + +# %% [markdown] +# yadda yadda + +# %% +from datetime import datetime +from functools import lru_cache + +import pandas as pd +import sqlalchemy +import psycopg2 +import pandas as pd +import soundfile +import matplotlib.pyplot as plt +import numpy as np +from scipy.fft import fft + +import database_utils +import waveform_utils + +# %% +all_params = database_utils.get_all_params() + +# %% +@lru_cache +def get_data_single_stream(visit_observation_type_id, source_location): + params = (visit_observation_type_id, source_location) + con = database_utils.engine.connect() + data = pd.read_sql_query(database_utils.SET_SEARCH_PATH + + """ + SELECT * + FROM WAVEFORM + WHERE visit_observation_type_id = %s AND source_location = %s + ORDER BY observation_datetime + """, con, params=params) + return data + +# %% +# For keeping output files from different runs separate +date_str = datetime.now().strftime('%Y%m%dT%H%M%S') +print(date_str) + + + +# %% +def to_ogg(): + date_str = datetime.now().strftime('%Y%m%dT%H%M%S') + for par in all_params.itertuples(): + data = get_data_single_stream(par.visit_observation_type_id, par.source_location) + all_points = [] + data['values_array'].apply(lambda va: all_points.extend(va)) + print(f"PRE max={max(all_points)}, min={min(all_points)}") + print(data.shape[0]) + print(len(all_points)) + all_points = [a/1000 for a in all_points] + print(f"POST max={max(all_points)}, min={min(all_points)}") + for sampling_rate in [88200]: + outfile = f"validation_output/output_{date_str}_{par.visit_observation_type_id}_{par.source_location}_{sampling_rate}.ogg" + soundfile.write(outfile, all_points, sampling_rate, format='OGG') + + +# %% +def get_distinct_sampling_rate(data): + unique_sampling_rate = data['sampling_rate'].unique() + assert len(unique_sampling_rate) == 1 + return unique_sampling_rate[0] + +# %% +def do_fft(all_points, sampling_rate): + sample_spacing = 1 / sampling_rate + # fft + all_points_centered = all_points - np.mean(all_points) + fft_values = fft(all_points_centered) + frequencies = np.fft.fftfreq(len(fft_values), sample_spacing) + # use magnitude of complex fft values + return all_points_centered, np.abs(fft_values), frequencies + + +# %% +def plot_waveform(par, max_seconds=10): + # global plot_df, data, all_points_centered, abs_fft_values, frequencies + data = get_data_single_stream(par.visit_observation_type_id, par.source_location) + sampling_rate = get_distinct_sampling_rate(data) + all_points = [] + data['values_array'].apply(lambda va: all_points.extend(va)) + # use only first N seconds + all_points_trimmed = all_points[:sampling_rate * max_seconds] + print(f"{par.source_location} sampling rate {sampling_rate}, data {len(all_points)} -> {len(all_points_trimmed)}") + all_points_centered, abs_fft_values, frequencies = do_fft(all_points_trimmed, sampling_rate) + fig, ax = plt.subplots(1, 2, figsize=(10, 5)) + print(f"|points| = {len(all_points_centered)}, |fft_vals| = {len(abs_fft_values)}, |frequencies|/2 = {len(frequencies)/2}") + # sampling rate / 2 is the absolute upper limit, but + # it's unlikely the real frequencies are anywhere near that + n = len(frequencies) // 8 + plot_df = pd.DataFrame(dict(freq=frequencies[:n], vals=abs_fft_values[:n])) + ax[0].set_xlabel('freq') + ax[0].set_ylabel('mag') + ax[0].plot(plot_df['freq'], plot_df['vals']) + idx_max = plot_df['vals'].idxmax() + max_row = plot_df.loc[idx_max] + print(max_row) + # make sure it's more than the message length *and* sql array cardinality + max_points_to_plot = 12000 + points_to_plot = min(max_points_to_plot, len(all_points_centered)) + ax[1].set_xlabel('sample num') + ax[1].set_ylabel('waveform value') + ax[1].plot(range(points_to_plot), all_points_centered[:points_to_plot]) + plt.show() + outfile = f"validation_output/graph_{date_str}_{par.visit_observation_type_id}_{par.source_location}.png" + plt.savefig(outfile) + + +# %% +# %matplotlib inline +for par in all_params.itertuples(): + if plot_waveform(par): + break + + +# %% +par = all_params[0] +data = get_data_single_stream(par.visit_observation_type_id, par.source_location) +one_per_row_reset_times = waveform_utils.explode_values_array(data) + + +# %% +one_per_row_reset_times.head() + +# %% +one_per_row_reset_times.iloc[0:100000:10000] + +# %% +one_per_row_reset_times.shape + +# %% diff --git a/monitoring/streamlit/st_home.py b/monitoring/streamlit/st_home.py new file mode 100644 index 000000000..e32024f0a --- /dev/null +++ b/monitoring/streamlit/st_home.py @@ -0,0 +1,22 @@ +import streamlit as st +from st_waveform import waveform_data +from st_integrity import data_integrity +import database_utils + +st.set_page_config(layout="wide") + +# All pages +pages = { + "Waveform Data": waveform_data, + "Data integrity": data_integrity, +} + +# sidebar +sb = st.sidebar +sb.title("Pages") +selection = sb.selectbox("Go to", list(pages.keys())) +sb.write(f"Schema: {database_utils.database_schema}") + +# Render the selected page +page = pages[selection] +page() diff --git a/monitoring/streamlit/st_integrity.py b/monitoring/streamlit/st_integrity.py new file mode 100644 index 000000000..f3ad0e457 --- /dev/null +++ b/monitoring/streamlit/st_integrity.py @@ -0,0 +1,7 @@ +import streamlit as st + + +def data_integrity(): + st.title("Data integrity") + st.write("Gaps, completeness etc.") + diff --git a/monitoring/streamlit/st_waveform.py b/monitoring/streamlit/st_waveform.py new file mode 100644 index 000000000..c36945008 --- /dev/null +++ b/monitoring/streamlit/st_waveform.py @@ -0,0 +1,119 @@ +from datetime import timedelta, datetime, timezone +import time + +import pandas as pd +import streamlit as st +import altair as alt +import database_utils + + +def draw_graph(location, stream_id, min_time, max_time): + # (re-)initialise slider value if not known or if the bounds have changed so that it is now outside them + if 'slider_value' not in st.session_state or not min_time <= st.session_state.slider_value <= max_time: + st.session_state.slider_value = max(min_time, max_time - timedelta(seconds=15)) + print(f"New bounds for stream {stream_id}, location {location}: min={min_time}, max={max_time}, value={st.session_state.slider_value}") + # BUG: error is given if there is exactly one point so min_time == max_time + graph_start_time = bottom_cols[0].slider("Start time", + min_value=min_time, max_value=max_time, + value=st.session_state.slider_value, + step=timedelta(seconds=10), format="") + st.session_state.slider_value = graph_start_time + + graph_width_seconds = top_cols[3].slider("Chart width (seconds)", min_value=1, max_value=30, value=30) + + graph_end_time = graph_start_time + timedelta(seconds=graph_width_seconds) + data = database_utils.get_data_single_stream_rounded(int(stream_id), location, + graph_start_time=graph_start_time, + graph_end_time=graph_end_time, + max_time=max_time) + trimmed = data[data['observation_datetime'].between(graph_start_time, graph_end_time)] + waveform_units = trimmed['unit'].drop_duplicates().tolist() + if len(waveform_units) > 1: + st_graph_area.error(f"duplicate units: {waveform_units}") + waveform_unit = "n/a" + elif len(waveform_units) == 0: + st_graph_area.error(f"no data over the given time period, try selecting another time") + waveform_unit = "n/a" + else: + waveform_unit = waveform_units[0] + + stream_label = unique_streams[stream_id] + chart = ( + alt.Chart(trimmed, width=1100, height=600) + # unfortunately the line continues over gaps in the data, but points are too ugly so stick with this for now + .mark_line(opacity=0.9) + .encode( + x=alt.X("observation_datetime", + title="Observation datetime", + # timeUnit="hoursminutesseconds", # using this causes a weird data corruption problem + scale=alt.Scale(type="utc"), + axis=alt.Axis(tickCount="millisecond", + tickColor="red", + tickBand="center", + titleFontSize=24, + ticks=True), + ), + y=alt.Y("waveform_value", + title=f"{stream_label} ({waveform_unit})", + stack=None, + axis=alt.Axis( + titleFontSize=24, + )), + # color="Region:N", + ) + #.interactive() + # .add_params( + # alt.selection_interval(bind='scales') + # ) + ) + st_graph_area.altair_chart(chart, use_container_width=True) + +def waveform_data(): + global unique_streams, st_graph_area, bottom_cols, top_cols + + st_top_controls = st.container() + st_bottom_controls = st.container() + st_graph_area = st.container() + st_info_box = st.container() + st_info_box.write(f"Schema: {database_utils.database_schema}") + top_cols = st_top_controls.columns(4) + bottom_cols = st_bottom_controls.columns(1, gap='medium') + + all_params = database_utils.get_all_params() + print(f"all_params = ", all_params) + + unique_streams_list = all_params.apply(lambda r: (r['visit_observation_type_id'], r['name']), axis=1).drop_duplicates().tolist() + unique_streams = dict(unique_streams_list) + if len(unique_streams_list) != len(unique_streams): + # the DB schema should ensure this doesn't happen, but check + st_graph_area.error(f"WARNING: apparent ambiguous mapping in {unique_streams_list}") + + print(f"unique streams = ", unique_streams) + location = top_cols[0].selectbox("Choose location", sorted(set(all_params['source_location']))) + streams_for_location = all_params[all_params['source_location'] == location]['visit_observation_type_id'] + stream_id = top_cols[1].selectbox("Choose stream", streams_for_location, format_func=lambda i: unique_streams[i]) + + print(f"location = {location}, stream_id = {stream_id}") + if not location: + st.error("Please select a location") + elif not stream_id: + st.error("Please select a stream") + else: + if top_cols[2].button("Re-check DB"): + st.cache_data.clear() + + # st.download_button(label, data, file_name=None, mime=None, key=None, help=None, on_click=None, args=None, kwargs=None, *, type="secondary", icon=None, disabled=False, use_container_width=False) + + print(f"getting bounds for stream = {stream_id}, location = {location}") + min_time, max_time = database_utils.get_min_max_time_for_single_stream(int(stream_id), location) + if min_time is None: + st_graph_area.error("No data for location+stream found") + else: + min_time = min_time.to_pydatetime() + max_time = max_time.to_pydatetime() + draw_graph(location, stream_id, min_time, max_time) + + + +if __name__ == "__main__": + waveform_data() diff --git a/monitoring/streamlit/validation.py b/monitoring/streamlit/validation.py new file mode 100644 index 000000000..5529ef749 --- /dev/null +++ b/monitoring/streamlit/validation.py @@ -0,0 +1,102 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# notebook_metadata_filter: -jupytext.text_representation.jupytext_version,-kernelspec +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# --- + +# %% +import os +from functools import lru_cache + +# %% +import pytest +from pytest import approx +import pandas as pd +import sqlalchemy +import psycopg2 + +# %% +database_url = 'postgresql+psycopg2://inform_user:inform@localhost:5433/fakeuds' +schema = "uds_schema" +search_path_preamble = f"set search_path to {schema};" +engine = sqlalchemy.create_engine(database_url) + +# %% +# put in fixture +con = engine.connect() + +# %% +qry = open("gaps.sql").read() + +# %% +all_params = pd.read_sql_query(search_path_preamble + + """ + SELECT DISTINCT visit_observation_type_id, source_location + FROM WAVEFORM + """, con) +print(all_params) +print("!!!") + +# %% +@lru_cache +def run_with_params(visit_observation_type_id, source_location): + params = (visit_observation_type_id, source_location) + print(f"running with {params}") + waveform_df = pd.read_sql_query(search_path_preamble + qry, con, params=params) + return waveform_df + +# %% [markdown] +# --AND observation_datetime < %s + +# %% +def test_all_for_gaps(): + for ps in all_params.itertuples(): + waveform_df = run_with_params(ps.visit_observation_type_id, ps.source_location) + duration = pd.to_timedelta(waveform_df['values_array'].apply(len), "seconds") / waveform_df['sampling_rate'] + # duration = pd.Timedelta(seconds=len(waveform_df['values_array']) / waveform_df['sampling_rate']) + waveform_df['duration'] = duration + waveform_df['calc_end_date'] = waveform_df['observation_datetime'] + duration + waveform_df['gap_since_last'] = (waveform_df['observation_datetime'] + - waveform_df['calc_end_date'].shift(1)).fillna(pd.Timedelta(0)) + first = waveform_df.iloc[0] + last = waveform_df.iloc[-1] + total_samples = waveform_df['values_array'].apply(len).sum() + total_active_time = waveform_df['duration'].sum() + total_calendar_time = last['calc_end_date'] - first['observation_datetime'] + # if there are no gaps or overlaps, total_active_time and total_calendar_time should be the same + sampling_rate = waveform_df['sampling_rate'].unique().tolist() + print(f"Total samples = {total_samples} @{sampling_rate}Hz, Total active time = {total_active_time}, total calendar = {total_calendar_time}") + indexes_with_gap = waveform_df[waveform_df['gap_since_last'].apply(abs) > pd.Timedelta(milliseconds=1)].index + print(f"Indexes with gap: {indexes_with_gap}") + print(f"with gap: {waveform_df[indexes_with_gap]}") + assert indexes_with_gap.empty + assert abs(total_active_time - total_calendar_time) < pd.Timedelta(milliseconds=1) + + # Index(['waveform_id', 'stored_from', 'valid_from', 'observation_datetime', + # 'sampling_rate', 'source_location', 'unit', 'values_array', + # 'location_visit_id', 'visit_observation_type_id'], + # dtype='object') + + +# %% +def test_no_orphaned_data(): + orphaned_data = pd.read_sql_query(search_path_preamble + + """ + SELECT * + FROM WAVEFORM + WHERE location_visit_id IS NULL + """, con) + print(orphaned_data) + # all data is orphaned because the generator doesn't put any ADT messages in! + assert orphaned_data.empty + + +# %% +test_all_for_gaps() + +# %% diff --git a/monitoring/streamlit/validation_output/.gitignore b/monitoring/streamlit/validation_output/.gitignore new file mode 100644 index 000000000..241e560df --- /dev/null +++ b/monitoring/streamlit/validation_output/.gitignore @@ -0,0 +1,2 @@ +* + diff --git a/monitoring/streamlit/waveform_utils.py b/monitoring/streamlit/waveform_utils.py new file mode 100644 index 000000000..e8c6f1e80 --- /dev/null +++ b/monitoring/streamlit/waveform_utils.py @@ -0,0 +1,2 @@ +import pandas as pd + diff --git a/portal-config-envs.EXAMPLE b/portal-config-envs.EXAMPLE new file mode 100644 index 000000000..4f24122e1 --- /dev/null +++ b/portal-config-envs.EXAMPLE @@ -0,0 +1,2 @@ +PORTAL_USERNAME= +PORTAL_PASSWORD= diff --git a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java index ec55a42ec..155a3b432 100644 --- a/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java +++ b/waveform-generator/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform_generator/Hl7Generator.java @@ -1,5 +1,8 @@ package uk.ac.ucl.rits.inform.datasources.waveform_generator; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; @@ -30,6 +33,7 @@ public class Hl7Generator { @Value("${waveform.synthetic.num_patients:30}") private int numPatients; + private final GeneratorContext generatorContext = new GeneratorContext(); /** * The generator can be run in "live" or "catch-up" mode. @@ -117,7 +121,7 @@ public void generateMessages() throws IOException { int numChunks = 0; Instant progressAtStart = progressDatetime; while (progressDatetime.isBefore(getExpectedProgressDatetime())) { - logger.info("Making HL7 messages"); + logger.info("Making HL7 messages starting at time {} for {} milliseconds", progressDatetime, millisPerChunk); List synthMsgs = makeSyntheticWaveformMsgsAllPatients(progressDatetime, numPatients, millisPerChunk); logger.info("Sending {} HL7 messages", synthMsgs.size()); // To avoid the worker threads in the reader being blocked trying to write to the @@ -226,6 +230,7 @@ private String applyHl7Template(long samplingRate, String locationId, Instant ob * @param locationId where the data originates from (machine/bed location) * @param streamId identifier for the stream * @param samplingRate in samples per second + * @param signalFrequencyHz the signal baseline frequency (Hz) * @param numMillis number of milliseconds to produce data for * @param startTime observation time of the beginning of the period that the messages are to cover * @param maxSamplesPerMessage max samples per message (will split into multiple messages if needed) @@ -234,25 +239,29 @@ private String applyHl7Template(long samplingRate, String locationId, Instant ob private List makeSyntheticWaveformMsgs(final String locationId, final String streamId, final long samplingRate, + final double signalFrequencyHz, final long numMillis, final Instant startTime, final long maxSamplesPerMessage ) { List allMessages = new ArrayList<>(); - final long numSamples = numMillis * samplingRate / 1000; + final long numSamplesThisCall = numMillis * samplingRate / 1000; final double maxValue = 999; - for (long overallSampleIdx = 0; overallSampleIdx < numSamples;) { - long microsAfterStart = overallSampleIdx * 1000_000 / samplingRate; + GeneratorContext.GeneratorContextRecord context = generatorContext.getContext(locationId, streamId); + // This counter persists over repeated calls to this function to avoid the input + // to sin being reset to zero every few seconds + long persistentSampleIdx = context.getCounter(); + for (int thisCallCounter = 0; thisCallCounter < numSamplesThisCall;) { + long microsAfterStart = thisCallCounter * 1000_000L / samplingRate; Instant messageStartTime = startTime.plus(microsAfterStart, ChronoUnit.MICROS); String timeStr = DateTimeFormatter.ofPattern("HHmmss").format(startTime.atOffset(ZoneOffset.UTC)); - String messageId = String.format("%s_s%s_t%s_msg%05d", locationId, streamId, timeStr, overallSampleIdx); + String messageId = String.format("%s_s%s_t%s_msg%05d", locationId, streamId, timeStr, persistentSampleIdx); var values = new ArrayList(); for (long valueIdx = 0; - valueIdx < maxSamplesPerMessage && overallSampleIdx < numSamples; - valueIdx++, overallSampleIdx++) { - // a sine wave between maxValue and -maxValue - values.add(2 * maxValue * Math.sin(overallSampleIdx * 0.01) - maxValue); + valueIdx < maxSamplesPerMessage && thisCallCounter < numSamplesThisCall; + valueIdx++, thisCallCounter++, persistentSampleIdx++) { + values.add(maxValue * Math.sin(2 * Math.PI * signalFrequencyHz * persistentSampleIdx / samplingRate)); } // Only one stream ID per HL7 message for the time being @@ -261,6 +270,7 @@ private List makeSyntheticWaveformMsgs(final String locationId, String fullHl7message = applyHl7Template(samplingRate, locationId, messageStartTime, messageId, valuesByStreamId); allMessages.add(fullHl7message); } + context.setCounter(persistentSampleIdx); return allMessages; } @@ -285,15 +295,27 @@ public List makeSyntheticWaveformMsgsAllPatients( Instant startTime, long numPatients, long numMillis) { List waveformMsgs = new ArrayList<>(); numPatients = Math.min(numPatients, possibleLocations.size()); + List syntheticStreams = List.of( + new SyntheticStream("52912", 50, 0.3, 5), // airway volume + new SyntheticStream("27", 300, 1.2, 10) // ECG + ); for (int p = 0; p < numPatients; p++) { var location = possibleLocations.get(p); - String streamId1 = "52912"; - String streamId2 = "27"; int sizeBefore = waveformMsgs.size(); - waveformMsgs.addAll(makeSyntheticWaveformMsgs( - location, streamId1, 50, numMillis, startTime, 5)); - waveformMsgs.addAll(makeSyntheticWaveformMsgs( - location, streamId2, 300, numMillis, startTime, 10)); + // each bed has a slightly different frequency + double frequencyFactor = 0.95 + 0.1 * p / possibleLocations.size(); + // don't turn on all streams for all patients to test more realistically + long streamsEnabledBitPattern = ~p; // p = 0 has all streams enabled, etc + for (int si = 0; si < syntheticStreams.size(); si++) { + boolean thisStreamEnabled = 0 != ((streamsEnabledBitPattern >> si) & 1); + if (!thisStreamEnabled) { + continue; + } + SyntheticStream stream = syntheticStreams.get(si); + waveformMsgs.addAll(makeSyntheticWaveformMsgs( + location, stream.streamId, stream.samplingRate, + stream.baselineSignalFrequency * frequencyFactor, numMillis, startTime, stream.maxSamplesPerMessage)); + } int sizeAfter = waveformMsgs.size(); logger.debug("Patient {} (location {}), generated {} messages", p, location, sizeAfter - sizeBefore); } @@ -301,4 +323,29 @@ public List makeSyntheticWaveformMsgsAllPatients( return waveformMsgs; } + record SyntheticStream(String streamId, int samplingRate, double baselineSignalFrequency, int maxSamplesPerMessage) { + } + + private class GeneratorContext { + private final Map, GeneratorContextRecord> allContexts = new HashMap<>(); + + @AllArgsConstructor + class GeneratorContextRecord { + @Getter @Setter + private long counter; + } + + public GeneratorContextRecord getContext(ImmutablePair contextKey) { + return allContexts.computeIfAbsent(contextKey, k -> new GeneratorContextRecord(0)); + } + + public GeneratorContextRecord getContext(String locationId, String streamId) { + return getContext(GeneratorContext.makeKey(locationId, streamId)); + } + + public static ImmutablePair makeKey(String locationId, String streamId) { + return new ImmutablePair<>(locationId, streamId); + } + } + } diff --git a/waveform-generator/waveform-generator-config-envs.EXAMPLE b/waveform-generator/waveform-generator-config-envs.EXAMPLE index 38df1a2d2..c75a80359 100644 --- a/waveform-generator/waveform-generator-config-envs.EXAMPLE +++ b/waveform-generator/waveform-generator-config-envs.EXAMPLE @@ -1,7 +1,7 @@ -WAVEFORM_SYNTHETIC_NUM_PATIENTS=30 -WAVEFORM_SYNTHETIC_WARP_FACTOR=6 -WAVEFORM_SYNTHETIC_START_DATETIME="2024-01-02T12:00:00Z" -WAVEFORM_SYNTHETIC_END_DATETIME="2024-01-03T12:00:00Z" +WAVEFORM_SYNTHETIC_NUM_PATIENTS= +WAVEFORM_SYNTHETIC_WARP_FACTOR= +WAVEFORM_SYNTHETIC_START_DATETIME= +WAVEFORM_SYNTHETIC_END_DATETIME= WAVEFORM_SYNTHETIC_TCP_CLIENT_POOL_SIZE=1 WAVEFORM_HL7_SEND_HOST="waveform-reader" WAVEFORM_HL7_SEND_PORT=7777 diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7FromFile.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7FromFile.java index d8d02be1f..1b31e65b8 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7FromFile.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7FromFile.java @@ -19,16 +19,16 @@ public class Hl7FromFile { private final Logger logger = LoggerFactory.getLogger(Hl7FromFile.class); private final ThreadPoolTaskExecutor listenTaskExecutor; - private final Hl7ParseAndSend hl7ParseAndSend; + private final Hl7ParseAndQueue hl7ParseAndQueue; private final File hl7DumpFile; static final String MESSAGE_DELIMITER = "\u001c"; Hl7FromFile(ThreadPoolTaskExecutor listenTaskExecutor, - Hl7ParseAndSend hl7ParseAndSend, + Hl7ParseAndQueue hl7ParseAndQueue, @Value("${waveform.hl7.test_dump_file:#{null}}") File hl7DumpFile ) { this.listenTaskExecutor = listenTaskExecutor; - this.hl7ParseAndSend = hl7ParseAndSend; + this.hl7ParseAndQueue = hl7ParseAndQueue; this.hl7DumpFile = hl7DumpFile; } @@ -64,7 +64,7 @@ void readOnceAndQueue(File hl7DumpFile) throws Hl7ParseException, WaveformCollat List messages = readFromFile(hl7DumpFile); logger.info("Read {} HL7 messages from test dump file", messages.size()); for (int mi = 0; mi < messages.size(); mi++) { - hl7ParseAndSend.parseAndQueue(messages.get(mi)); + hl7ParseAndQueue.parseAndQueue(messages.get(mi)); if (mi % 100 == 0) { logger.info("handled {} messages out of {}", mi + 1, messages.size()); } diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java index ffab64e1a..ae446941d 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ListenerConfig.java @@ -31,10 +31,10 @@ public class Hl7ListenerConfig { private final Logger logger = LoggerFactory.getLogger(Hl7ListenerConfig.class); - private final Hl7ParseAndSend hl7ParseAndSend; + private final Hl7ParseAndQueue hl7ParseAndQueue; - public Hl7ListenerConfig(Hl7ParseAndSend hl7ParseAndSend) { - this.hl7ParseAndSend = hl7ParseAndSend; + public Hl7ListenerConfig(Hl7ParseAndQueue hl7ParseAndQueue) { + this.hl7ParseAndQueue = hl7ParseAndQueue; } /** @@ -164,7 +164,7 @@ public void handler(Message msg) throws Hl7ParseException, WaveformColla byte[] asBytes = msg.getPayload(); String asStr = new String(asBytes, StandardCharsets.UTF_8); // parse message from HL7 to interchange message, send to internal queue - hl7ParseAndSend.parseAndQueue(asStr); + hl7ParseAndQueue.parseAndQueue(asStr); } } diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndQueue.java similarity index 95% rename from waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java rename to waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndQueue.java index fb6aed15d..20f61246e 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndSend.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/Hl7ParseAndQueue.java @@ -23,18 +23,23 @@ import java.util.Optional; import java.util.Set; +/** + * Receive HL7 messages, transform each to an interchange message, and + * store them in memory ready for collation into bigger interchange messages + * (see {@link WaveformCollator}). + */ @Component -public class Hl7ParseAndSend { - private final Logger logger = LoggerFactory.getLogger(Hl7ParseAndSend.class); +public class Hl7ParseAndQueue { + private final Logger logger = LoggerFactory.getLogger(Hl7ParseAndQueue.class); private final WaveformOperations waveformOperations; private final WaveformCollator waveformCollator; private final SourceMetadata sourceMetadata; private final LocationMapping locationMapping; private long numHl7 = 0; - Hl7ParseAndSend(WaveformOperations waveformOperations, - WaveformCollator waveformCollator, - SourceMetadata sourceMetadata, LocationMapping locationMapping) { + Hl7ParseAndQueue(WaveformOperations waveformOperations, + WaveformCollator waveformCollator, + SourceMetadata sourceMetadata, LocationMapping locationMapping) { this.waveformOperations = waveformOperations; this.waveformCollator = waveformCollator; this.sourceMetadata = sourceMetadata; diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/SourceMetadata.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/SourceMetadata.java index c3335a16f..f7684e188 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/SourceMetadata.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/SourceMetadata.java @@ -22,7 +22,7 @@ */ @Component public class SourceMetadata { - private final Logger logger = LoggerFactory.getLogger(Hl7ParseAndSend.class); + private final Logger logger = LoggerFactory.getLogger(Hl7ParseAndQueue.class); private static final Resource CSV_RESOURCE = new ClassPathResource("source-metadata/Device_Values_formatted.csv"); private Map metadataByStreamId = new HashMap<>(); diff --git a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java index 31fe924f0..ebcf63d68 100644 --- a/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java +++ b/waveform-reader/src/main/java/uk/ac/ucl/rits/inform/datasources/waveform/WaveformCollator.java @@ -19,6 +19,11 @@ import java.util.TreeMap; @Component +/** + * Read interchange messages produced by {@link Hl7ParseAndQueue}, identify + * contiguous data to turn them into bigger interchange messages for greater + * DB storage efficiency. + */ public class WaveformCollator { private final Logger logger = LoggerFactory.getLogger(WaveformCollator.class); protected final Map, SortedMap> pendingMessages = new HashMap<>(); @@ -155,10 +160,12 @@ private WaveformMessage collateContiguousData(SortedMap newNumericValues = new ArrayList<>(); Iterator> perPatientMapIter = perPatientMap.entrySet().iterator(); + // keep track of incoming message sizes for general interest (does not affect collation algorithm) + Map uncollatedMessageSizes = new HashMap<>(); int messagesToCollate = 0; while (perPatientMapIter.hasNext()) { Map.Entry entry = perPatientMapIter.next(); @@ -168,27 +175,61 @@ private WaveformMessage collateContiguousData(SortedMap targetCollatedMessageSamples) { - logger.debug("Reached sample target ({} > {}), collated message span: {} -> {}", - sampleCount, targetCollatedMessageSamples, + logger.debug("Reached sample target ({} > {}), collated message size {}, collated message span: {} -> {}", + sampleCount, targetCollatedMessageSamples, sampleCount - thisMessageSampleCount, firstMsg.getObservationTime(), msg.getObservationTime()); break; } - if (expectedNextDatetime != null) { - Instant gapUpperBound = checkGap(msg, expectedNextDatetime, assumedRounding); - if (gapUpperBound != null) { - logger.info("Key {}, collated message span: {} -> {} ({} milliseconds, {} samples)", + if (previousMsg != null) { + Instant expectedNextDatetime = previousMsg.getExpectedNextObservationDatetime(); + try { + Instant gapUpperBound = checkGap(msg, expectedNextDatetime, assumedRounding); + if (gapUpperBound != null) { + logger.info("Key {} ({}Hz), collated message span: {} -> {} ({} milliseconds, {} messages, {} samples)", + makeKey(msg), + msg.getSamplingRate(), + firstMsg.getObservationTime(), + expectedNextDatetime, + firstMsg.getObservationTime().until(expectedNextDatetime, ChronoUnit.MILLIS), + messagesToCollate, + sampleCount); + // Found a gap, stop here, excluding `msg`. + // Collation may still happen if data is old enough that we don't want to wait for more. + break; + } + } catch (CollationOverlapException coe) { + logger.error(""" + Key {} ({}Hz), {}, between: + previous message ({} -> {}) {} samples + this message ({} -> {}) {} samples + """, makeKey(msg), - firstMsg.getObservationTime(), msg.getObservationTime(), - firstMsg.getObservationTime().until(msg.getObservationTime(), ChronoUnit.MILLIS), - sampleCount); - // Found a gap, stop here. Decide later whether data is old enough to make a message anyway. + msg.getSamplingRate(), + coe.getMessage(), + previousMsg.getObservationTime(), expectedNextDatetime, + previousMsg.getNumericValues().get().size(), + msg.getObservationTime(), msg.getExpectedNextObservationDatetime(), + msg.getNumericValues().get().size()); + // The data can't be corrected, but we can at least stop collating at this point. + // The overlapping message will be the first message of the next collation run, + // which at least exposes the overlap in the database rather than trying to obscure it. break; } } - expectedNextDatetime = msg.getExpectedNextObservationDatetime(); + previousMsg = msg; // don't modify yet, because we don't yet know if we will reach criteria to collate (num samples, time passed) messagesToCollate++; @@ -197,13 +238,18 @@ private WaveformMessage collateContiguousData(SortedMap> secondPassIter = perPatientMap.entrySet().iterator(); for (int i = 0; i < messagesToCollate; i++) { Map.Entry entry = secondPassIter.next(); @@ -220,7 +266,7 @@ private WaveformMessage collateContiguousData(SortedMap msgs = hl7ParseAndSend.parseHl7(hl7String); + List msgs = hl7ParseAndQueue.parseHl7(hl7String); assertEquals(5, msgs.size()); List actualSource = msgs.stream().map(WaveformMessage::getSourceLocationString).distinct().toList(); assertEquals(1, actualSource.size()); @@ -94,7 +91,7 @@ void checkMessage(String hl7String, String expectedSourceLocation, String expect void messageWithMoreThanOneRepeat() throws IOException, URISyntaxException { String hl7String = readHl7FromResource("hl7/test1.hl7"); String hl7WithReps = hl7String.replace("42.50^", "42.50~"); - Hl7ParseException e = assertThrows(Hl7ParseException.class, () -> hl7ParseAndSend.parseHl7(hl7WithReps)); + Hl7ParseException e = assertThrows(Hl7ParseException.class, () -> hl7ParseAndQueue.parseHl7(hl7WithReps)); assertTrue(e.getMessage().contains("only be 1 repeat")); } @@ -102,7 +99,7 @@ void messageWithMoreThanOneRepeat() throws IOException, URISyntaxException { void messageWithConflictingLocation() throws IOException, URISyntaxException { String hl7String = readHl7FromResource("hl7/test1.hl7"); String hl7WithReps = hl7String.replace("PV1||I|UCHT03ICURM08|", "PV1||I|UCHT03ICURM07|"); - Hl7ParseException e = assertThrows(Hl7ParseException.class, () -> hl7ParseAndSend.parseHl7(hl7WithReps)); + Hl7ParseException e = assertThrows(Hl7ParseException.class, () -> hl7ParseAndQueue.parseHl7(hl7WithReps)); assertTrue(e.getMessage().contains("Unexpected location")); } diff --git a/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java b/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java index 3f3345687..05f06d306 100644 --- a/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java +++ b/waveform-reader/src/test/java/uk/ac/ucl/rits/inform/datasources/waveform/TestWaveformCollation.java @@ -27,7 +27,7 @@ @ActiveProfiles("test") public class TestWaveformCollation { @Autowired - private Hl7ParseAndSend hl7ParseAndSend; + private Hl7ParseAndQueue hl7ParseAndQueue; @Autowired private WaveformCollator waveformCollator; @@ -77,9 +77,10 @@ private void makeAndAddTestMessages() throws WaveformCollator.CollationException } static Stream noGapsData() { - // We are adjusting the target number of samples config option rather than - // the actual number of samples supplied, which may be a bit unintuitive but - // is easier and amounts to the same thing. + // There is a fixed quantity of 3000 samples in messages containing 5 samples each. + // We are adjusting the *target* number of samples config option rather than + // the actual number of messages/samples supplied, which may be unintuitive but + // amounts to the same thing and means we can use the same test data each time. return Stream.of( // only just happened Arguments.of(3000, 10000, List.of(3000), 0), diff --git a/waveform-reader/waveform-reader-config-envs.EXAMPLE b/waveform-reader/waveform-reader-config-envs.EXAMPLE index f3877d6a3..772da9607 100644 --- a/waveform-reader/waveform-reader-config-envs.EXAMPLE +++ b/waveform-reader/waveform-reader-config-envs.EXAMPLE @@ -7,5 +7,5 @@ SPRING_RABBITMQ_PORT=5672 SPRING_RABBITMQ_USERNAME=my_name SPRING_RABBITMQ_PASSWORD=my_pw -WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST=127.0.0.1 -WAVEFORM_HL7_TEST_DUMP_FILE="" +WAVEFORM_HL7_SOURCE_ADDRESS_ALLOW_LIST= +WAVEFORM_HL7_TEST_DUMP_FILE=