diff --git a/docs/source/for-developers/setup-environment.md b/docs/source/for-developers/setup-environment.md index aa9a125..b2b4da3 100644 --- a/docs/source/for-developers/setup-environment.md +++ b/docs/source/for-developers/setup-environment.md @@ -16,8 +16,8 @@ cd pittgoogle-client # recommended to create a new conda env # use the latest python version unless you have a specific reason not to -conda create --name pittgoogle python=3.12 -conda activate pittgoogle +conda create --name pittgoogle-dev python=3.12 +conda activate pittgoogle-dev # install pittgoogle-client in editable mode. use pwd so that the absolute path is registered. pip install -e $(pwd) diff --git a/pittgoogle/alert.py b/pittgoogle/alert.py index cd1d95f..d805b78 100644 --- a/pittgoogle/alert.py +++ b/pittgoogle/alert.py @@ -10,17 +10,21 @@ import base64 import datetime import importlib.resources +import io import logging +import random from pathlib import Path from typing import TYPE_CHECKING, Any, Mapping, Union import attrs import google.cloud.pubsub_v1 -from . import registry, types_, exceptions +from . import exceptions, registry, types_ from .schema import Schema # so 'schema' module doesn't clobber 'Alert.schema' attribute if TYPE_CHECKING: + import astropy.table + import google.cloud.functions_v1 import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run LOGGER = logging.getLogger(__name__) @@ -60,10 +64,50 @@ class Alert: ) path: Path | None = attrs.field(default=None) # Use "Union" because " | " is throwing an error when combined with forward references. + context: Union[ + "google.cloud.functions_v1.context.Context", types_._FunctionsContextLike, None + ] = attrs.field(default=None) _dataframe: Union["pd.DataFrame", None] = attrs.field(default=None) + _skymap: Union["astropy.table.Qtable", None] = attrs.field(default=None) _schema: Schema | None = attrs.field(default=None, init=False) # ---- class methods ---- # + @classmethod + def from_cloud_functions( + cls, + event: Mapping, + context: "google.cloud.functions_v1.context.Context", + schema_name: str | None = None, + ): + """Create an `Alert` from an 'event' and 'context', as received by a Cloud Functions module. + + Argument definitions copied from https://cloud.google.com/functions/1stgendocs/tutorials/pubsub-1st-gen + + Args: + event (dict): + The dictionary with data specific to this type of event. The `@type` field maps to + `type.googleapis.com/google.pubsub.v1.PubsubMessage`. The `data` field maps to the + PubsubMessage data in a base64-encoded string. The `attributes` field maps to the + PubsubMessage attributes if any is present. + context (google.cloud.functions.Context): + Metadata of triggering event including `event_id` which maps to the PubsubMessage + messageId, `timestamp` which maps to the PubsubMessage publishTime, `event_type` which + maps to `google.pubsub.topic.publish`, and `resource` which is a dictionary that + describes the service API endpoint pubsub.googleapis.com, the triggering topic's name, + and the triggering event type `type.googleapis.com/google.pubsub.v1.PubsubMessage`. + """ + return cls( + msg=types_.PubsubMessageLike( + # data is required. the rest should be present in the message, but use get to be lenient + data=base64.b64decode(event["data"]), + attributes=event.get("attributes", {}), + message_id=context.event_id, + publish_time=cls._str_to_datetime(context.timestamp), + ), + context=context, + schema_name=schema_name, + ) + @classmethod def from_cloud_run(cls, envelope: Mapping, schema_name: str | None = None) -> "Alert": """Create an `Alert` from an HTTP request envelope containing a Pub/Sub message, as received by a Cloud Run module. @@ -118,21 +162,13 @@ def index(): if not isinstance(envelope, dict) or "message" not in envelope: raise exceptions.BadRequest("Bad Request: invalid Pub/Sub message format") - # convert the message publish_time string -> datetime - # occasionally the string doesn't include microseconds so we need a try/except - publish_time = envelope["message"]["publish_time"].replace("Z", "+00:00") - try: - publish_time = datetime.datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S.%f%z") - except ValueError: - publish_time = datetime.datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S%z") - return cls( msg=types_.PubsubMessageLike( # data is required. the rest should be present in the message, but use get to be lenient data=base64.b64decode(envelope["message"]["data"].encode("utf-8")), - attributes=envelope["message"].get("attributes"), + attributes=envelope["message"].get("attributes", {}), message_id=envelope["message"].get("message_id"), - publish_time=publish_time, + publish_time=cls._str_to_datetime(envelope["message"]["publish_time"]), ordering_key=envelope["message"].get("ordering_key"), ), schema_name=schema_name, @@ -181,7 +217,7 @@ def from_msg( @classmethod def from_path(cls, path: str | Path, schema_name: str | None = None) -> "Alert": - """Creates an `Alert` object from the file at the specified `path`. + """Create an `Alert` object from the file at the specified `path`. Args: path (str or Path): @@ -205,6 +241,11 @@ def from_path(cls, path: str | Path, schema_name: str | None = None) -> "Alert": msg=types_.PubsubMessageLike(data=bytes_), schema_name=schema_name, path=Path(path) ) + def to_mock_input(self, cloud_functions: bool = False): + if not cloud_functions: + raise NotImplementedError("Only cloud functions has been implemented.") + return MockInput(alert=self).to_cloud_functions() + # ---- properties ---- # @property def attributes(self) -> Mapping: @@ -219,6 +260,7 @@ def attributes(self) -> Mapping: """ if self._attributes is None: self._attributes = dict(self.msg.attributes) + self._add_id_attributes() return self._attributes @property @@ -253,9 +295,9 @@ def dataframe(self) -> "pd.DataFrame": import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run # sources and previous sources are expected to have the same fields - sources_df = pd.DataFrame([self.get("source")] + self.get("prv_sources")) + sources_df = pd.DataFrame([self.get("source")] + self.get("prv_sources", [])) # sources and forced sources may have different fields - forced_df = pd.DataFrame(self.get("prv_forced_sources")) + forced_df = pd.DataFrame(self.get("prv_forced_sources", [])) # use nullable integer data type to avoid converting ints to floats # for columns in one dataframe but not the other @@ -307,6 +349,63 @@ def schema(self) -> Schema: self._schema = registry.Schemas.get(self.schema_name) return self._schema + @property + def skymap(self) -> Union["astropy.table.QTable", None]: + """Alert skymap as an astropy Table. Currently implemented for LVK schemas only. + + This skymap is loaded from the alert to an astropy table and extra columns are added, following + https://emfollow.docs.ligo.org/userguide/tutorial/multiorder_skymaps.html. + The table is sorted by PROBDENSITY and then UNIQ, in descending order, so that the most likely + location is first. Columns: + + - UNIQ: HEALPix pixel index in the NUNIQ indexing scheme. + - PROBDENSITY: Probability density in the pixel (per steradian). + - nside: HEALPix nside parameter defining the pixel resolution. + - ipix: HEALPix pixel index at resolution nside. + - ra: Right ascension of the pixel center (radians). + - dec: Declination of the pixel center (radians). + - pixel_area: Area of the pixel (steradians). + - prob: Probability density in the pixel. + - cumprob: Cumulative probability density up to the pixel. + + Examples: + + .. code-block:: python + + # most likely location + alert.skymap[0] + + # 90% credible region + alert.skymap[:alert.skymap['cumprob'].searchsorted(0.9)] + """ + if self._skymap is None and self.schema_name.startswith("lvk"): + import astropy.table + import astropy.units + import hpgeom + import numpy as np + + skymap = astropy.table.QTable.read(io.BytesIO(base64.b64decode(self.get("skymap")))) + skymap.sort(["PROBDENSITY", "UNIQ"], reverse=True) + + skymap["nside"] = (2 ** (np.log2(skymap["UNIQ"] // 4) // 2)).astype(int) + skymap["ipix"] = skymap["UNIQ"] - 4 * skymap["nside"] ** 2 + + skymap["ra"], skymap["dec"] = hpgeom.pixel_to_angle( + skymap["nside"], skymap["ipix"], degrees=False + ) + skymap["ra"].unit = astropy.units.rad + skymap["dec"].unit = astropy.units.rad + + skymap["pixel_area"] = hpgeom.nside_to_pixel_area(skymap["nside"], degrees=False) + skymap["pixel_area"].unit = astropy.units.sr + + skymap["prob"] = skymap["pixel_area"] * skymap["PROBDENSITY"] + skymap["cumprob"] = skymap["prob"].cumsum() + + self._skymap = skymap + + return self._skymap + # ---- methods ---- # def _add_id_attributes(self) -> None: """Add the IDs ("alertid", "objectid", "sourceid") to :attr:`Alert.attributes`.""" @@ -319,10 +418,10 @@ def _add_id_attributes(self) -> None: # but pubsub message attributes must be strings. join to avoid a future error on publish names = [".".join(id) if isinstance(id, list) else id for id in survey_names] - # only add to attributes if the survey has defined this field + # only add to attributes if the survey has defined this field and it's not already in the attributes for idname, idvalue in zip(names, values): - if idname is not None: - self.attributes[idname] = idvalue + if idname is not None and idname not in self._attributes: + self._attributes[idname] = idvalue def get(self, field: str, default: Any = None) -> Any: """Return the value of a field from the alert data. @@ -401,3 +500,64 @@ def get_key( return survey_field[-1] return survey_field + + def _prep_for_publish(self) -> tuple[bytes, Mapping[str, str]]: + """Serialize the alert dict and convert all attribute keys and values to strings.""" + message = self.schema.serialize(self.dict) + # Pub/Sub requires attribute keys and values to be strings. Sort the keys while we're at it. + attributes = {str(key): str(self.attributes[key]) for key in sorted(self.attributes)} + return message, attributes + + @staticmethod + def _str_to_datetime(str_time: str) -> datetime.datetime: + # occasionally the string doesn't include microseconds so we need a try/except + try: + return datetime.datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%S.%f%z") + except ValueError: + return datetime.datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%S%z") + + +@attrs.define +class MockInput: + alert: Alert = attrs.field() + + def to_cloud_functions(self) -> tuple[dict, types_._FunctionsContextLike]: + """ + + Parameter definitions copied from https://cloud.google.com/functions/1stgendocs/tutorials/pubsub-1st-gen + + Returns: + event (dict): + The dictionary with data specific to this type of event. The `@type` field maps to + `type.googleapis.com/google.pubsub.v1.PubsubMessage`. The `data` field maps to the + PubsubMessage data in a base64-encoded string. The `attributes` field maps to the + PubsubMessage attributes if any is present. + context (google.cloud.functions.Context): + Metadata of triggering event including `event_id` which maps to the PubsubMessage + messageId, `timestamp` which maps to the PubsubMessage publishTime, `event_type` which + maps to `google.pubsub.topic.publish`, and `resource` which is a dictionary that + describes the service API endpoint pubsub.googleapis.com, the triggering topic's name, + and the triggering event type `type.googleapis.com/google.pubsub.v1.PubsubMessage`. + """ + message, attributes = self.alert._prep_for_publish() + event_type = "type.googleapis.com/google.pubsub.v1.PubsubMessage" + now = ( + datetime.datetime.now(datetime.timezone.utc) + .isoformat(timespec="milliseconds") + .replace("+00:00", "Z") + ) + + event = {"@type": event_type, "data": base64.b64encode(message), "attributes": attributes} + + context = types_._FunctionsContextLike( + event_id=str(int(1e12 * random.random())), + timestamp=now, + event_type="google.pubsub.topic.publish", + resource={ + "name": "projects/NONE/topics/NONE", + "service": "pubsub.googleapis.com", + "type": event_type, + }, + ) + + return event, context diff --git a/pittgoogle/pubsub.py b/pittgoogle/pubsub.py index ee585c7..8aabf40 100644 --- a/pittgoogle/pubsub.py +++ b/pittgoogle/pubsub.py @@ -276,10 +276,7 @@ def delete(self) -> None: def publish(self, alert: "Alert") -> int: """Publish a message with :attr:`pittgoogle.Alert.dict` as the payload and :attr:`pittgoogle.Alert.attributes` as the attributes.""" - # Pub/Sub requires attribute keys and values to be strings. Sort the keys while we're at it. - attributes = {str(key): str(alert.attributes[key]) for key in sorted(alert.attributes)} - message = alert.schema.serialize(alert.dict) - + message, attributes = alert._prep_for_publish() future = self.client.publish(self.path, data=message, **attributes) return future.result() diff --git a/pittgoogle/registry_manifests/schemas.yml b/pittgoogle/registry_manifests/schemas.yml index 0534132..05134a8 100644 --- a/pittgoogle/registry_manifests/schemas.yml +++ b/pittgoogle/registry_manifests/schemas.yml @@ -50,6 +50,13 @@ 5: '' 6: '' # +# LVK alerts +- name: 'lvk' + description: 'Schema for LIGO-Virgo-KAGRA (LVK) alerts. JSON format.' + origin: 'https://emfollow.docs.ligo.org/userguide/content.html#kafka-notice-gcn-scimma' + helper: 'default_schema_helper' + path: null +# # ZTF alerts - name: 'ztf' description: 'ZTF schema. The ZTF survey publishes alerts in Avro format with the schema attached in the header. Pitt-Google publishes ZTF alerts in json format. This schema covers both cases.' diff --git a/pittgoogle/schemas/maps/TEMPLATE.yml b/pittgoogle/schemas/maps/TEMPLATE.yml index a9df1e4..026d87f 100644 --- a/pittgoogle/schemas/maps/TEMPLATE.yml +++ b/pittgoogle/schemas/maps/TEMPLATE.yml @@ -36,4 +36,5 @@ mag_zp: [diaSource, magzpsci] # float (magnitude zero point) mjd: [diaSource, midPointTai] # float ra: [diaSource, ra] # float ra_err: [diaSource, raErr] # float +skymap: [event, skymap] # str snr: [diaSource, snr] # float diff --git a/pittgoogle/schemas/maps/lvk.yml b/pittgoogle/schemas/maps/lvk.yml new file mode 100644 index 0000000..8bb13ed --- /dev/null +++ b/pittgoogle/schemas/maps/lvk.yml @@ -0,0 +1,13 @@ +SURVEY: lvk +SCHEMA_ORIGIN: 'https://emfollow.docs.ligo.org/userguide/content.html#kafka-notice-gcn-scimma' +# +# IDs. +alertid: superevent_id # str +sourceid: superevent_id # str +# +# Sources and Objects +source: event # record +# +# Everything else. +skymap: [event, skymap] +# [TODO] What else would be useful? diff --git a/pittgoogle/types_.py b/pittgoogle/types_.py index 2a7baf2..f07ec20 100644 --- a/pittgoogle/types_.py +++ b/pittgoogle/types_.py @@ -45,3 +45,34 @@ class PubsubMessageLike: """Timestamp of the published message.""" ordering_key: str | None = attrs.field(default=None) """Pub/Sub ordering key of the published message.""" + + +@attrs.define(frozen=True) +class _FunctionsContextLike: + """Container that mimics the "context" metadata of a Cloud Functions triggering event. + + Parameters definitions copied from https://cloud.google.com/functions/1stgendocs/tutorials/pubsub-1st-gen + + Parameters; + event_id (str): + Maps to PubsubMessage 'messageId'. + timestamp (str): + Maps to PubsubMessage 'publishTime'. + event_type (str): + Maps to `google.pubsub.topic.publish`. + resource (dict): + Describes the service API endpoint pubsub.googleapis.com, the triggering + topic's name, and the triggering event type + `type.googleapis.com/google.pubsub.v1.PubsubMessage`. + """ + + event_id: str + timestamp: str + event_type: str = attrs.field(default="google.pubsub.topic.publish") + resource: dict = attrs.field( + default={ + "name": "projects/mock-project/topics/mock-topic", + "service": "pubsub.googleapis.com", + "type": "type.googleapis.com/google.pubsub.v1.PubsubMessage", + } + ) diff --git a/pittgoogle/utils.py b/pittgoogle/utils.py index 0ada4ca..e306623 100644 --- a/pittgoogle/utils.py +++ b/pittgoogle/utils.py @@ -8,16 +8,17 @@ ---- """ import base64 -import collections import io import json import logging +from typing import TYPE_CHECKING -import astropy.table -import astropy.time import attrs import fastavro +if TYPE_CHECKING: + import astropy.table + LOGGER = logging.getLogger(__name__) @@ -105,7 +106,7 @@ def b64avro_to_dict(bytes_data): # --- Work with alert dictionaries @staticmethod - def alert_dict_to_table(alert_dict: dict) -> astropy.table.Table: + def alert_dict_to_table(alert_dict: dict) -> "astropy.table.Table": """Package a ZTF alert dictionary into an Astropy Table. Args: @@ -117,6 +118,9 @@ def alert_dict_to_table(alert_dict: dict) -> astropy.table.Table: An Astropy Table containing the alert information. """ + import astropy.table + import collections + # collect rows for the table candidate = collections.OrderedDict(alert_dict["candidate"]) rows = [candidate] @@ -159,6 +163,8 @@ def jd_to_readable_date(jd) -> str: str: The ``jd`` in the format 'day mon year hour:min'. """ + import astropy.time + return astropy.time.Time(jd, format="jd").strftime("%d %b %Y - %H:%M:%S")