diff --git a/.gitignore b/.gitignore index 7fc2eb883..50871c91b 100644 --- a/.gitignore +++ b/.gitignore @@ -48,3 +48,10 @@ hs_err_pid* # IntelliJ files .idea/ *.iml + +# python +__pycache__ + +# prevent notebooks from being checked in +*.ipynb +.ipynb_checkpoints diff --git a/core/Dockerfile.emap-portal b/core/Dockerfile.emap-portal new file mode 100644 index 000000000..0beed03ea --- /dev/null +++ b/core/Dockerfile.emap-portal @@ -0,0 +1,10 @@ +FROM nginx:otel +RUN apt update && \ + apt install -y apache2-utils && \ + apt clean +COPY core/emap-portal/nginx.conf /etc/nginx/ +COPY core/emap-portal/conf.d/ /etc/nginx/conf.d/ +COPY core/emap-portal/www/* /usr/share/nginx/html/ +RUN --mount=type=secret,id=portal-build-secrets \ + . /run/secrets/portal-build-secrets && \ + htpasswd -b -B -c /etc/nginx/conf.d/htpasswd "$PORTAL_USERNAME" "$PORTAL_PASSWORD" diff --git a/core/docker-compose.yml b/core/docker-compose.yml index 43346ba37..d86fa564e 100644 --- a/core/docker-compose.yml +++ b/core/docker-compose.yml @@ -54,4 +54,25 @@ services: restart: on-failure depends_on: - cassandra + emap-portal: + build: + context: .. + dockerfile: core/Dockerfile.emap-portal + args: + HTTP_PROXY: ${HTTP_PROXY} + http_proxy: ${http_proxy} + HTTPS_PROXY: ${HTTPS_PROXY} + https_proxy: ${https_proxy} + secrets: + - portal-build-secrets + env_file: + - ../../config/portal-config-envs + ports: + - "${PORTAL_PORT}:80" + + + +secrets: + portal-build-secrets: + file: ../../config/portal-config-envs diff --git a/core/emap-portal/conf.d/default.conf b/core/emap-portal/conf.d/default.conf new file mode 100644 index 000000000..145b9301f --- /dev/null +++ b/core/emap-portal/conf.d/default.conf @@ -0,0 +1,43 @@ +server { + listen 80; + listen [::]:80; + server_name localhost; + # nginx is behind docker, so the browser is using a different port number which nginx doesn't know about. + # Use relative redirects to avoid redirecting to port 80. (301s are used when trailing slashes are omitted) + absolute_redirect off; + auth_basic "Administrator’s Area"; + auth_basic_user_file conf.d/htpasswd; + + + access_log /var/log/nginx/host.access.log main; + + #error_page 404 /404.html; + + # redirect server error pages to the static page /50x.html + # + error_page 500 502 503 504 /50x.html; + location = /50x.html { + root /usr/share/nginx/html; + } + + location / { + root /usr/share/nginx/html; + include conf.d/shared/shared_location_config.conf; + } + + location /glowroot/ { + include conf.d/shared/shared_location_config.conf; + proxy_pass http://glowroot-central:4000/; + sub_filter 'href="/' 'href="/glowroot/'; + sub_filter 'src="/' 'src="/glowroot/'; + } + + location /streamlit/ { + include conf.d/shared/shared_location_config.conf; + proxy_pass http://streamlit:8501/streamlit/; + proxy_buffering off; + + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + } +} diff --git a/core/emap-portal/conf.d/shared/shared_location_config.conf b/core/emap-portal/conf.d/shared/shared_location_config.conf new file mode 100644 index 000000000..287a632e8 --- /dev/null +++ b/core/emap-portal/conf.d/shared/shared_location_config.conf @@ -0,0 +1,11 @@ +sub_filter_once off; +proxy_redirect off; +proxy_set_header Host $host; +proxy_set_header X-Real-IP $remote_addr; +proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; +proxy_set_header X-Forwarded-Proto $scheme; +proxy_set_header Accept-Encoding ""; # turn off gzip for upstream so rewriting can work +# needed for websockets +proxy_http_version 1.1; +proxy_read_timeout 86400; +proxy_send_timeout 3600; diff --git a/core/emap-portal/nginx.conf b/core/emap-portal/nginx.conf new file mode 100644 index 000000000..a0a893518 --- /dev/null +++ b/core/emap-portal/nginx.conf @@ -0,0 +1,50 @@ + +user nginx; +worker_processes auto; + +error_log /var/log/nginx/error.log debug; +pid /var/run/nginx.pid; + + +events { + worker_connections 1024; +} + + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + access_log /var/log/nginx/access.log main; + + sendfile on; + #tcp_nopush on; + + keepalive_timeout 65; + + #gzip on; + + include /etc/nginx/conf.d/*.conf; +} + +# pure TCP proxy? +# +# stream { +# upstream backend { +# server backend-server:12345; +# } +# +# server { +# listen 12345; +# proxy_pass backend; +# Allow specific IP addresses +# allow 192.168.1.1; # Replace with the allowed IP address +# allow 192.168.1.2; # Add more allowed IP addresses as needed +# deny all; # Deny all other IP addresses + +# } +# } \ No newline at end of file diff --git a/core/emap-portal/www/index.html b/core/emap-portal/www/index.html new file mode 100644 index 000000000..4c9beec5a --- /dev/null +++ b/core/emap-portal/www/index.html @@ -0,0 +1,17 @@ + + + + + Emap admin page + + +You can access various Emap admin/visualisation/monitoring services: + + + + + diff --git a/emap-setup/emap_runner/docker/docker_runner.py b/emap-setup/emap_runner/docker/docker_runner.py index 3668c907c..6c231867a 100644 --- a/emap-setup/emap_runner/docker/docker_runner.py +++ b/emap-setup/emap_runner/docker/docker_runner.py @@ -108,6 +108,8 @@ def docker_compose_paths(self) -> List[Path]: paths.append(Path(self.emap_dir, "waveform-reader", "docker-compose.yml")) if self.use_fake_waveform: paths.append(Path(self.emap_dir, "waveform-generator", "docker-compose.yml")) + if self.config.get("monitoring", "use_streamlit"): + paths.append(Path(self.emap_dir, "monitoring", "docker-compose.yml")) # allow for hoover and to be optional compose path if "hoover" in self.config["repositories"]: diff --git a/emap-setup/emap_runner/global_config.py b/emap-setup/emap_runner/global_config.py index 4a0beb1d8..b1ce8e770 100644 --- a/emap-setup/emap_runner/global_config.py +++ b/emap-setup/emap_runner/global_config.py @@ -22,7 +22,8 @@ class GlobalConfiguration(dict): "glowroot", "common", "fake_uds", - "waveform" + "waveform", + "monitoring", ) def __init__(self, filepath: Path): diff --git a/emap-setup/global-configuration-EXAMPLE.yaml b/emap-setup/global-configuration-EXAMPLE.yaml index 67b8ef2ac..732f66839 100644 --- a/emap-setup/global-configuration-EXAMPLE.yaml +++ b/emap-setup/global-configuration-EXAMPLE.yaml @@ -109,3 +109,11 @@ waveform: WAVEFORM_SYNTHETIC_WARP_FACTOR: 6 WAVEFORM_SYNTHETIC_START_DATETIME: "2024-01-02T12:00:00Z" WAVEFORM_SYNTHETIC_END_DATETIME: "2024-01-03T12:00:00Z" + +# The nginx portal and other monitoring/validation/visualisation services +monitoring: + SERVER_EXTERNAL_HOSTNAME: server.fqdn.example + PORTAL_PORT: 7100 + PORTAL_USERNAME: emap + PORTAL_PASSWORD: portal_password + use_streamlit: false diff --git a/emap-star/emap-star/src/main/java/uk/ac/ucl/rits/inform/informdb/visit_recordings/Waveform.java b/emap-star/emap-star/src/main/java/uk/ac/ucl/rits/inform/informdb/visit_recordings/Waveform.java index 4a0730948..c27f9fe2c 100644 --- a/emap-star/emap-star/src/main/java/uk/ac/ucl/rits/inform/informdb/visit_recordings/Waveform.java +++ b/emap-star/emap-star/src/main/java/uk/ac/ucl/rits/inform/informdb/visit_recordings/Waveform.java @@ -34,6 +34,7 @@ @Index(name = "waveform_datetime", columnList = "observationDatetime"), @Index(name = "waveform_location", columnList = "sourceLocation"), @Index(name = "waveform_location_visit", columnList = "locationVisitId"), + @Index(name = "waveform_observation_type", columnList = "visitObservationTypeId"), }) @Data @EqualsAndHashCode(callSuper = true) diff --git a/global-config-envs.EXAMPLE b/global-config-envs.EXAMPLE index e96471e55..ece509d24 100644 --- a/global-config-envs.EXAMPLE +++ b/global-config-envs.EXAMPLE @@ -4,3 +4,4 @@ RABBITMQ_ADMIN_PORT=5674 GLOWROOT_ADMIN_PORT=4000 FAKEUDS_PORT=5433 HL7_READER_PORT=9999 +PORTAL_PORT= diff --git a/monitoring/docker-compose.yml b/monitoring/docker-compose.yml new file mode 100644 index 000000000..bce28a089 --- /dev/null +++ b/monitoring/docker-compose.yml @@ -0,0 +1,15 @@ +services: + streamlit: + build: + context: .. + dockerfile: monitoring/streamlit/Dockerfile + args: + HTTP_PROXY: ${HTTP_PROXY} + http_proxy: ${http_proxy} + HTTPS_PROXY: ${HTTPS_PROXY} + https_proxy: ${https_proxy} + env_file: + - ../../config/streamlit-config-envs + logging: + driver: "json-file" + restart: "no" diff --git a/monitoring/requirements.txt b/monitoring/requirements.txt new file mode 100644 index 000000000..f83b25db6 --- /dev/null +++ b/monitoring/requirements.txt @@ -0,0 +1,11 @@ +jupyter +jupyterlab +jupytext +matplotlib +pandas +psycopg2-binary +pytest +scipy +soundfile +sqlalchemy +streamlit diff --git a/monitoring/streamlit-config-envs.EXAMPLE b/monitoring/streamlit-config-envs.EXAMPLE new file mode 100644 index 000000000..236eceb1f --- /dev/null +++ b/monitoring/streamlit-config-envs.EXAMPLE @@ -0,0 +1,6 @@ +UDS_JDBC_URL= +UDS_SCHEMA= +UDS_USERNAME= +UDS_PASSWORD= +SERVER_EXTERNAL_HOSTNAME= +PORTAL_PORT= diff --git a/monitoring/streamlit/Dockerfile b/monitoring/streamlit/Dockerfile new file mode 100644 index 000000000..4d54e6e72 --- /dev/null +++ b/monitoring/streamlit/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.12-slim-bullseye +WORKDIR /app/streamlit +COPY monitoring/requirements.txt /app/streamlit +RUN pip install -r requirements.txt +COPY monitoring/streamlit/ /app/streamlit +CMD streamlit run \ + --browser.gatherUsageStats=false \ + --server.enableWebsocketCompression=false \ + --server.enableXsrfProtection=false \ + # base URL to match where the proxy expects it to be - simpler than URL rewriting in the proxy + --server.baseUrlPath "streamlit" \ + # Without this, websocket calls don't work behind nginx + --browser.serverAddress ${SERVER_EXTERNAL_HOSTNAME} \ + --browser.serverPort ${PORTAL_PORT} \ + st_home.py diff --git a/monitoring/streamlit/__init__.py b/monitoring/streamlit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/monitoring/streamlit/database_utils.py b/monitoring/streamlit/database_utils.py new file mode 100644 index 000000000..2a228dda4 --- /dev/null +++ b/monitoring/streamlit/database_utils.py @@ -0,0 +1,136 @@ +import math +import os +from datetime import timedelta + +import pandas as pd +import sqlalchemy +from sqlalchemy.engine.url import make_url +import streamlit as st +import psycopg2 + +# Perhaps we should move away from making the JDBC url primary, but +# for now we will have to accept this and make some edits so we can +# use it here. +database_jdbc_url = os.environ['UDS_JDBC_URL'] +database_user = os.environ['UDS_USERNAME'] +database_password = os.environ['UDS_PASSWORD'] +database_schema = os.environ['UDS_SCHEMA'] +database_url = make_url(database_jdbc_url.replace("jdbc:", "")) +# host, database, and port will be correct, but change the driver and user/pass +database_url = database_url.set(drivername='postgresql+psycopg2', username=database_user, password=database_password) + +SET_SEARCH_PATH = f"set search_path to {database_schema};" +engine = sqlalchemy.create_engine(database_url) + + +@st.cache_data(ttl=60) +def get_all_params(): + with engine.connect() as con: + return pd.read_sql_query(SET_SEARCH_PATH + + """ + SELECT DISTINCT + w.visit_observation_type_id, + w.source_location, + vot.name + FROM WAVEFORM w + INNER JOIN VISIT_OBSERVATION_TYPE vot + ON vot.visit_observation_type_id = w.visit_observation_type_id + """, con) + + +@st.cache_data(ttl=60) +def get_min_max_time_for_single_stream(visit_observation_type_id, source_location): + params = (visit_observation_type_id, source_location) + query = SET_SEARCH_PATH + """ + SELECT min(observation_datetime) as min_time, max(observation_datetime) as max_time + FROM WAVEFORM + WHERE visit_observation_type_id = %s AND source_location = %s + """ + with engine.connect() as con: + minmax = pd.read_sql_query(query, con, params=params) + if minmax.empty: + return None, None + else: + return minmax.iloc[0].min_time, minmax.iloc[0].max_time + + +def get_data_single_stream_rounded(visit_observation_type_id, source_location, graph_start_time, graph_end_time, max_time, max_row_length_seconds=30): + # Because a row's observation_datetime is the time of the *first* data point in the array, + # to get the data starting at time T, you have to query the DB for data a little earlier than T. + # Additionally, to aid caching, round down further so repeated calls with + # approximately similar values of min_time will result in exactly the + # same query being issued (which is hopefully already cached) + actual_min_time = graph_start_time - timedelta(seconds=max_row_length_seconds) + rounded_seconds = actual_min_time.second // 10 * 10 + rounded_min_time = actual_min_time.replace(second=rounded_seconds, microsecond=0) + # For the same reason, round the max value up to the nearest few seconds (5 is pretty arbitrary) + # (using +timedelta instead of replacing seconds value because you might hit 60 and have to wrap around) + # However, do not ask for data beyond what we know exists (max_time). We don't want + # the incomplete response to get cached. + rounded_max_time = (graph_end_time.replace(second=0, microsecond=0) + + timedelta(seconds=math.ceil((graph_end_time.second + graph_end_time.microsecond/1_000_000) / 5) * 5)) + capped_at_max = False + if rounded_max_time > max_time: + capped_at_max = True + rounded_max_time = max_time + print(f"Adjusted min time {graph_start_time} -> {rounded_min_time}") + print(f"Adjusted max time {graph_end_time} -> {rounded_max_time} {'(capped)' if capped_at_max else ''}") + return get_data_single_stream(visit_observation_type_id, source_location, rounded_min_time, rounded_max_time) + + +@st.cache_data(ttl=1800) +def get_data_single_stream(visit_observation_type_id, source_location, min_time, max_time): + params = (visit_observation_type_id, source_location, min_time, max_time) + # Index(['waveform_id', 'stored_from', 'valid_from', 'observation_datetime', + # 'sampling_rate', 'source_location', 'unit', 'values_array', + # 'location_visit_id', 'visit_observation_type_id'], + # dtype='object') + # It's much quicker to do the array unpacking and date calculation here rather than in pandas later. + # This will still need a trim because the way the SQL arrays work you get more data than you need. + query = SET_SEARCH_PATH + """ + SELECT + w.waveform_id, + w.observation_datetime AS base_observation_datetime, + w.observation_datetime + make_interval(secs => (v.ordinality - 1)::float / w.sampling_rate) AS observation_datetime, + v.v as waveform_value, + v.ordinality, + w.sampling_rate, + w.source_location, + w.unit, + w.location_visit_id, + w.visit_observation_type_id + FROM WAVEFORM w, unnest(w.values_array) WITH ORDINALITY v + WHERE visit_observation_type_id = %s AND source_location = %s + AND observation_datetime >= %s + AND observation_datetime <= %s + ORDER BY observation_datetime + """ + # print(f"qry = {query}, params = {params}") + with engine.connect() as con: + data = pd.read_sql_query(query, con, params=params) + return data + + +def get_waveform_coverage(visit_observation_type_id, source_location, min_time, max_time): + query = SET_SEARCH_PATH + """ + SELECT + w.waveform_id, + w.observation_datetime AS base_observation_datetime, + w.observation_datetime + make_interval(secs => (v.ordinality - 1)::float / w.sampling_rate) AS observation_datetime, + cardinality(w.values_array), + w.sampling_rate, + w.source_location, + w.unit, + w.location_visit_id, + w.visit_observation_type_id + FROM WAVEFORM w + WHERE visit_observation_type_id = %s AND source_location = %s + AND observation_datetime >= %s + AND observation_datetime <= %s + ORDER BY observation_datetime + """ + # print(f"qry = {query}, params = {params}") + with engine.connect() as con: + data = pd.read_sql_query(query, con, + params=(visit_observation_type_id, source_location, min_time, max_time)) + return data diff --git a/monitoring/streamlit/gaps.sql b/monitoring/streamlit/gaps.sql new file mode 100644 index 000000000..a67924cd9 --- /dev/null +++ b/monitoring/streamlit/gaps.sql @@ -0,0 +1,7 @@ +SELECT * +FROM waveform +WHERE + visit_observation_type_id = %s + AND source_location = %s +ORDER BY observation_datetime +; diff --git a/monitoring/streamlit/jupytext.toml b/monitoring/streamlit/jupytext.toml new file mode 100644 index 000000000..ee448177c --- /dev/null +++ b/monitoring/streamlit/jupytext.toml @@ -0,0 +1,4 @@ +# Every notebook in this folder should be paired with the Python percent format + +formats = "ipynb,py:percent" +notebook_metadata_filter = "-jupytext.text_representation.jupytext_version,-kernelspec" diff --git a/monitoring/streamlit/presentation.py b/monitoring/streamlit/presentation.py new file mode 100644 index 000000000..8d46b09b5 --- /dev/null +++ b/monitoring/streamlit/presentation.py @@ -0,0 +1,147 @@ +# -*- coding: utf-8 -*- +# --- +# jupyter: +# jupytext: +# notebook_metadata_filter: -jupytext.text_representation.jupytext_version,-kernelspec +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# --- + +# %% + +# %% [markdown] +# yadda yadda + +# %% +from datetime import datetime +from functools import lru_cache + +import pandas as pd +import sqlalchemy +import psycopg2 +import pandas as pd +import soundfile +import matplotlib.pyplot as plt +import numpy as np +from scipy.fft import fft + +import database_utils +import waveform_utils + +# %% +all_params = database_utils.get_all_params() + +# %% +@lru_cache +def get_data_single_stream(visit_observation_type_id, source_location): + params = (visit_observation_type_id, source_location) + con = database_utils.engine.connect() + data = pd.read_sql_query(database_utils.SET_SEARCH_PATH + + """ + SELECT * + FROM WAVEFORM + WHERE visit_observation_type_id = %s AND source_location = %s + ORDER BY observation_datetime + """, con, params=params) + return data + +# %% +# For keeping output files from different runs separate +date_str = datetime.now().strftime('%Y%m%dT%H%M%S') +print(date_str) + + + +# %% +def to_ogg(): + date_str = datetime.now().strftime('%Y%m%dT%H%M%S') + for par in all_params.itertuples(): + data = get_data_single_stream(par.visit_observation_type_id, par.source_location) + all_points = [] + data['values_array'].apply(lambda va: all_points.extend(va)) + print(f"PRE max={max(all_points)}, min={min(all_points)}") + print(data.shape[0]) + print(len(all_points)) + all_points = [a/1000 for a in all_points] + print(f"POST max={max(all_points)}, min={min(all_points)}") + for sampling_rate in [88200]: + outfile = f"validation_output/output_{date_str}_{par.visit_observation_type_id}_{par.source_location}_{sampling_rate}.ogg" + soundfile.write(outfile, all_points, sampling_rate, format='OGG') + + +# %% +def get_distinct_sampling_rate(data): + unique_sampling_rate = data['sampling_rate'].unique() + assert len(unique_sampling_rate) == 1 + return unique_sampling_rate[0] + +# %% +def do_fft(all_points, sampling_rate): + sample_spacing = 1 / sampling_rate + # fft + all_points_centered = all_points - np.mean(all_points) + fft_values = fft(all_points_centered) + frequencies = np.fft.fftfreq(len(fft_values), sample_spacing) + # use magnitude of complex fft values + return all_points_centered, np.abs(fft_values), frequencies + + +# %% +def plot_waveform(par, max_seconds=10): + # global plot_df, data, all_points_centered, abs_fft_values, frequencies + data = get_data_single_stream(par.visit_observation_type_id, par.source_location) + sampling_rate = get_distinct_sampling_rate(data) + all_points = [] + data['values_array'].apply(lambda va: all_points.extend(va)) + # use only first N seconds + all_points_trimmed = all_points[:sampling_rate * max_seconds] + print(f"{par.source_location} sampling rate {sampling_rate}, data {len(all_points)} -> {len(all_points_trimmed)}") + all_points_centered, abs_fft_values, frequencies = do_fft(all_points_trimmed, sampling_rate) + fig, ax = plt.subplots(1, 2, figsize=(10, 5)) + print(f"|points| = {len(all_points_centered)}, |fft_vals| = {len(abs_fft_values)}, |frequencies|/2 = {len(frequencies)/2}") + # sampling rate / 2 is the absolute upper limit, but + # it's unlikely the real frequencies are anywhere near that + n = len(frequencies) // 8 + plot_df = pd.DataFrame(dict(freq=frequencies[:n], vals=abs_fft_values[:n])) + ax[0].set_xlabel('freq') + ax[0].set_ylabel('mag') + ax[0].plot(plot_df['freq'], plot_df['vals']) + idx_max = plot_df['vals'].idxmax() + max_row = plot_df.loc[idx_max] + print(max_row) + # make sure it's more than the message length *and* sql array cardinality + max_points_to_plot = 12000 + points_to_plot = min(max_points_to_plot, len(all_points_centered)) + ax[1].set_xlabel('sample num') + ax[1].set_ylabel('waveform value') + ax[1].plot(range(points_to_plot), all_points_centered[:points_to_plot]) + plt.show() + outfile = f"validation_output/graph_{date_str}_{par.visit_observation_type_id}_{par.source_location}.png" + plt.savefig(outfile) + + +# %% +# %matplotlib inline +for par in all_params.itertuples(): + if plot_waveform(par): + break + + +# %% +par = all_params[0] +data = get_data_single_stream(par.visit_observation_type_id, par.source_location) +one_per_row_reset_times = waveform_utils.explode_values_array(data) + + +# %% +one_per_row_reset_times.head() + +# %% +one_per_row_reset_times.iloc[0:100000:10000] + +# %% +one_per_row_reset_times.shape + +# %% diff --git a/monitoring/streamlit/st_home.py b/monitoring/streamlit/st_home.py new file mode 100644 index 000000000..e32024f0a --- /dev/null +++ b/monitoring/streamlit/st_home.py @@ -0,0 +1,22 @@ +import streamlit as st +from st_waveform import waveform_data +from st_integrity import data_integrity +import database_utils + +st.set_page_config(layout="wide") + +# All pages +pages = { + "Waveform Data": waveform_data, + "Data integrity": data_integrity, +} + +# sidebar +sb = st.sidebar +sb.title("Pages") +selection = sb.selectbox("Go to", list(pages.keys())) +sb.write(f"Schema: {database_utils.database_schema}") + +# Render the selected page +page = pages[selection] +page() diff --git a/monitoring/streamlit/st_integrity.py b/monitoring/streamlit/st_integrity.py new file mode 100644 index 000000000..f3ad0e457 --- /dev/null +++ b/monitoring/streamlit/st_integrity.py @@ -0,0 +1,7 @@ +import streamlit as st + + +def data_integrity(): + st.title("Data integrity") + st.write("Gaps, completeness etc.") + diff --git a/monitoring/streamlit/st_waveform.py b/monitoring/streamlit/st_waveform.py new file mode 100644 index 000000000..c36945008 --- /dev/null +++ b/monitoring/streamlit/st_waveform.py @@ -0,0 +1,119 @@ +from datetime import timedelta, datetime, timezone +import time + +import pandas as pd +import streamlit as st +import altair as alt +import database_utils + + +def draw_graph(location, stream_id, min_time, max_time): + # (re-)initialise slider value if not known or if the bounds have changed so that it is now outside them + if 'slider_value' not in st.session_state or not min_time <= st.session_state.slider_value <= max_time: + st.session_state.slider_value = max(min_time, max_time - timedelta(seconds=15)) + print(f"New bounds for stream {stream_id}, location {location}: min={min_time}, max={max_time}, value={st.session_state.slider_value}") + # BUG: error is given if there is exactly one point so min_time == max_time + graph_start_time = bottom_cols[0].slider("Start time", + min_value=min_time, max_value=max_time, + value=st.session_state.slider_value, + step=timedelta(seconds=10), format="") + st.session_state.slider_value = graph_start_time + + graph_width_seconds = top_cols[3].slider("Chart width (seconds)", min_value=1, max_value=30, value=30) + + graph_end_time = graph_start_time + timedelta(seconds=graph_width_seconds) + data = database_utils.get_data_single_stream_rounded(int(stream_id), location, + graph_start_time=graph_start_time, + graph_end_time=graph_end_time, + max_time=max_time) + trimmed = data[data['observation_datetime'].between(graph_start_time, graph_end_time)] + waveform_units = trimmed['unit'].drop_duplicates().tolist() + if len(waveform_units) > 1: + st_graph_area.error(f"duplicate units: {waveform_units}") + waveform_unit = "n/a" + elif len(waveform_units) == 0: + st_graph_area.error(f"no data over the given time period, try selecting another time") + waveform_unit = "n/a" + else: + waveform_unit = waveform_units[0] + + stream_label = unique_streams[stream_id] + chart = ( + alt.Chart(trimmed, width=1100, height=600) + # unfortunately the line continues over gaps in the data, but points are too ugly so stick with this for now + .mark_line(opacity=0.9) + .encode( + x=alt.X("observation_datetime", + title="Observation datetime", + # timeUnit="hoursminutesseconds", # using this causes a weird data corruption problem + scale=alt.Scale(type="utc"), + axis=alt.Axis(tickCount="millisecond", + tickColor="red", + tickBand="center", + titleFontSize=24, + ticks=True), + ), + y=alt.Y("waveform_value", + title=f"{stream_label} ({waveform_unit})", + stack=None, + axis=alt.Axis( + titleFontSize=24, + )), + # color="Region:N", + ) + #.interactive() + # .add_params( + # alt.selection_interval(bind='scales') + # ) + ) + st_graph_area.altair_chart(chart, use_container_width=True) + +def waveform_data(): + global unique_streams, st_graph_area, bottom_cols, top_cols + + st_top_controls = st.container() + st_bottom_controls = st.container() + st_graph_area = st.container() + st_info_box = st.container() + st_info_box.write(f"Schema: {database_utils.database_schema}") + top_cols = st_top_controls.columns(4) + bottom_cols = st_bottom_controls.columns(1, gap='medium') + + all_params = database_utils.get_all_params() + print(f"all_params = ", all_params) + + unique_streams_list = all_params.apply(lambda r: (r['visit_observation_type_id'], r['name']), axis=1).drop_duplicates().tolist() + unique_streams = dict(unique_streams_list) + if len(unique_streams_list) != len(unique_streams): + # the DB schema should ensure this doesn't happen, but check + st_graph_area.error(f"WARNING: apparent ambiguous mapping in {unique_streams_list}") + + print(f"unique streams = ", unique_streams) + location = top_cols[0].selectbox("Choose location", sorted(set(all_params['source_location']))) + streams_for_location = all_params[all_params['source_location'] == location]['visit_observation_type_id'] + stream_id = top_cols[1].selectbox("Choose stream", streams_for_location, format_func=lambda i: unique_streams[i]) + + print(f"location = {location}, stream_id = {stream_id}") + if not location: + st.error("Please select a location") + elif not stream_id: + st.error("Please select a stream") + else: + if top_cols[2].button("Re-check DB"): + st.cache_data.clear() + + # st.download_button(label, data, file_name=None, mime=None, key=None, help=None, on_click=None, args=None, kwargs=None, *, type="secondary", icon=None, disabled=False, use_container_width=False) + + print(f"getting bounds for stream = {stream_id}, location = {location}") + min_time, max_time = database_utils.get_min_max_time_for_single_stream(int(stream_id), location) + if min_time is None: + st_graph_area.error("No data for location+stream found") + else: + min_time = min_time.to_pydatetime() + max_time = max_time.to_pydatetime() + draw_graph(location, stream_id, min_time, max_time) + + + +if __name__ == "__main__": + waveform_data() diff --git a/monitoring/streamlit/validation.py b/monitoring/streamlit/validation.py new file mode 100644 index 000000000..5529ef749 --- /dev/null +++ b/monitoring/streamlit/validation.py @@ -0,0 +1,102 @@ +# --- +# jupyter: +# jupytext: +# cell_metadata_filter: -all +# notebook_metadata_filter: -jupytext.text_representation.jupytext_version,-kernelspec +# text_representation: +# extension: .py +# format_name: percent +# format_version: '1.3' +# --- + +# %% +import os +from functools import lru_cache + +# %% +import pytest +from pytest import approx +import pandas as pd +import sqlalchemy +import psycopg2 + +# %% +database_url = 'postgresql+psycopg2://inform_user:inform@localhost:5433/fakeuds' +schema = "uds_schema" +search_path_preamble = f"set search_path to {schema};" +engine = sqlalchemy.create_engine(database_url) + +# %% +# put in fixture +con = engine.connect() + +# %% +qry = open("gaps.sql").read() + +# %% +all_params = pd.read_sql_query(search_path_preamble + + """ + SELECT DISTINCT visit_observation_type_id, source_location + FROM WAVEFORM + """, con) +print(all_params) +print("!!!") + +# %% +@lru_cache +def run_with_params(visit_observation_type_id, source_location): + params = (visit_observation_type_id, source_location) + print(f"running with {params}") + waveform_df = pd.read_sql_query(search_path_preamble + qry, con, params=params) + return waveform_df + +# %% [markdown] +# --AND observation_datetime < %s + +# %% +def test_all_for_gaps(): + for ps in all_params.itertuples(): + waveform_df = run_with_params(ps.visit_observation_type_id, ps.source_location) + duration = pd.to_timedelta(waveform_df['values_array'].apply(len), "seconds") / waveform_df['sampling_rate'] + # duration = pd.Timedelta(seconds=len(waveform_df['values_array']) / waveform_df['sampling_rate']) + waveform_df['duration'] = duration + waveform_df['calc_end_date'] = waveform_df['observation_datetime'] + duration + waveform_df['gap_since_last'] = (waveform_df['observation_datetime'] + - waveform_df['calc_end_date'].shift(1)).fillna(pd.Timedelta(0)) + first = waveform_df.iloc[0] + last = waveform_df.iloc[-1] + total_samples = waveform_df['values_array'].apply(len).sum() + total_active_time = waveform_df['duration'].sum() + total_calendar_time = last['calc_end_date'] - first['observation_datetime'] + # if there are no gaps or overlaps, total_active_time and total_calendar_time should be the same + sampling_rate = waveform_df['sampling_rate'].unique().tolist() + print(f"Total samples = {total_samples} @{sampling_rate}Hz, Total active time = {total_active_time}, total calendar = {total_calendar_time}") + indexes_with_gap = waveform_df[waveform_df['gap_since_last'].apply(abs) > pd.Timedelta(milliseconds=1)].index + print(f"Indexes with gap: {indexes_with_gap}") + print(f"with gap: {waveform_df[indexes_with_gap]}") + assert indexes_with_gap.empty + assert abs(total_active_time - total_calendar_time) < pd.Timedelta(milliseconds=1) + + # Index(['waveform_id', 'stored_from', 'valid_from', 'observation_datetime', + # 'sampling_rate', 'source_location', 'unit', 'values_array', + # 'location_visit_id', 'visit_observation_type_id'], + # dtype='object') + + +# %% +def test_no_orphaned_data(): + orphaned_data = pd.read_sql_query(search_path_preamble + + """ + SELECT * + FROM WAVEFORM + WHERE location_visit_id IS NULL + """, con) + print(orphaned_data) + # all data is orphaned because the generator doesn't put any ADT messages in! + assert orphaned_data.empty + + +# %% +test_all_for_gaps() + +# %% diff --git a/monitoring/streamlit/validation_output/.gitignore b/monitoring/streamlit/validation_output/.gitignore new file mode 100644 index 000000000..241e560df --- /dev/null +++ b/monitoring/streamlit/validation_output/.gitignore @@ -0,0 +1,2 @@ +* + diff --git a/monitoring/streamlit/waveform_utils.py b/monitoring/streamlit/waveform_utils.py new file mode 100644 index 000000000..e8c6f1e80 --- /dev/null +++ b/monitoring/streamlit/waveform_utils.py @@ -0,0 +1,2 @@ +import pandas as pd + diff --git a/portal-config-envs.EXAMPLE b/portal-config-envs.EXAMPLE new file mode 100644 index 000000000..4f24122e1 --- /dev/null +++ b/portal-config-envs.EXAMPLE @@ -0,0 +1,2 @@ +PORTAL_USERNAME= +PORTAL_PASSWORD=