Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for LVK and Cloud Functions #67

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions docs/source/for-developers/setup-environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ cd pittgoogle-client

# recommended to create a new conda env
# use the latest python version unless you have a specific reason not to
conda create --name pittgoogle python=3.12
conda activate pittgoogle
conda create --name pittgoogle-dev python=3.12
conda activate pittgoogle-dev

# install pittgoogle-client in editable mode. use pwd so that the absolute path is registered.
pip install -e $(pwd)
Expand Down
194 changes: 177 additions & 17 deletions pittgoogle/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@
import base64
import datetime
import importlib.resources
import io
import logging
import random
from pathlib import Path
from typing import TYPE_CHECKING, Any, Mapping, Union

import attrs
import google.cloud.pubsub_v1

from . import registry, types_, exceptions
from . import exceptions, registry, types_
from .schema import Schema # so 'schema' module doesn't clobber 'Alert.schema' attribute

if TYPE_CHECKING:
import astropy.table
import google.cloud.functions_v1
import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -60,10 +64,50 @@ class Alert:
)
path: Path | None = attrs.field(default=None)
# Use "Union" because " | " is throwing an error when combined with forward references.
context: Union[
"google.cloud.functions_v1.context.Context", types_._FunctionsContextLike, None
] = attrs.field(default=None)
_dataframe: Union["pd.DataFrame", None] = attrs.field(default=None)
_skymap: Union["astropy.table.Qtable", None] = attrs.field(default=None)
_schema: Schema | None = attrs.field(default=None, init=False)

# ---- class methods ---- #
@classmethod
def from_cloud_functions(
cls,
event: Mapping,
context: "google.cloud.functions_v1.context.Context",
schema_name: str | None = None,
):
"""Create an `Alert` from an 'event' and 'context', as received by a Cloud Functions module.

Argument definitions copied from https://cloud.google.com/functions/1stgendocs/tutorials/pubsub-1st-gen

Args:
event (dict):
The dictionary with data specific to this type of event. The `@type` field maps to
`type.googleapis.com/google.pubsub.v1.PubsubMessage`. The `data` field maps to the
PubsubMessage data in a base64-encoded string. The `attributes` field maps to the
PubsubMessage attributes if any is present.
context (google.cloud.functions.Context):
Metadata of triggering event including `event_id` which maps to the PubsubMessage
messageId, `timestamp` which maps to the PubsubMessage publishTime, `event_type` which
maps to `google.pubsub.topic.publish`, and `resource` which is a dictionary that
describes the service API endpoint pubsub.googleapis.com, the triggering topic's name,
and the triggering event type `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
"""
return cls(
msg=types_.PubsubMessageLike(
# data is required. the rest should be present in the message, but use get to be lenient
data=base64.b64decode(event["data"]),
attributes=event.get("attributes", {}),
message_id=context.event_id,
publish_time=cls._str_to_datetime(context.timestamp),
),
context=context,
schema_name=schema_name,
)

@classmethod
def from_cloud_run(cls, envelope: Mapping, schema_name: str | None = None) -> "Alert":
"""Create an `Alert` from an HTTP request envelope containing a Pub/Sub message, as received by a Cloud Run module.
Expand Down Expand Up @@ -118,21 +162,13 @@ def index():
if not isinstance(envelope, dict) or "message" not in envelope:
raise exceptions.BadRequest("Bad Request: invalid Pub/Sub message format")

# convert the message publish_time string -> datetime
# occasionally the string doesn't include microseconds so we need a try/except
publish_time = envelope["message"]["publish_time"].replace("Z", "+00:00")
try:
publish_time = datetime.datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S.%f%z")
except ValueError:
publish_time = datetime.datetime.strptime(publish_time, "%Y-%m-%dT%H:%M:%S%z")

return cls(
msg=types_.PubsubMessageLike(
# data is required. the rest should be present in the message, but use get to be lenient
data=base64.b64decode(envelope["message"]["data"].encode("utf-8")),
attributes=envelope["message"].get("attributes"),
attributes=envelope["message"].get("attributes", {}),
message_id=envelope["message"].get("message_id"),
publish_time=publish_time,
publish_time=cls._str_to_datetime(envelope["message"]["publish_time"]),
ordering_key=envelope["message"].get("ordering_key"),
),
schema_name=schema_name,
Expand Down Expand Up @@ -181,7 +217,7 @@ def from_msg(

@classmethod
def from_path(cls, path: str | Path, schema_name: str | None = None) -> "Alert":
"""Creates an `Alert` object from the file at the specified `path`.
"""Create an `Alert` object from the file at the specified `path`.

Args:
path (str or Path):
Expand All @@ -205,6 +241,11 @@ def from_path(cls, path: str | Path, schema_name: str | None = None) -> "Alert":
msg=types_.PubsubMessageLike(data=bytes_), schema_name=schema_name, path=Path(path)
)

def to_mock_input(self, cloud_functions: bool = False):
if not cloud_functions:
raise NotImplementedError("Only cloud functions has been implemented.")
return MockInput(alert=self).to_cloud_functions()

# ---- properties ---- #
@property
def attributes(self) -> Mapping:
Expand All @@ -219,6 +260,7 @@ def attributes(self) -> Mapping:
"""
if self._attributes is None:
self._attributes = dict(self.msg.attributes)
self._add_id_attributes()
return self._attributes

@property
Expand Down Expand Up @@ -253,9 +295,9 @@ def dataframe(self) -> "pd.DataFrame":
import pandas as pd # always lazy-load pandas. it hogs memory on cloud functions and run

# sources and previous sources are expected to have the same fields
sources_df = pd.DataFrame([self.get("source")] + self.get("prv_sources"))
sources_df = pd.DataFrame([self.get("source")] + self.get("prv_sources", []))
# sources and forced sources may have different fields
forced_df = pd.DataFrame(self.get("prv_forced_sources"))
forced_df = pd.DataFrame(self.get("prv_forced_sources", []))

# use nullable integer data type to avoid converting ints to floats
# for columns in one dataframe but not the other
Expand Down Expand Up @@ -307,6 +349,63 @@ def schema(self) -> Schema:
self._schema = registry.Schemas.get(self.schema_name)
return self._schema

@property
def skymap(self) -> Union["astropy.table.QTable", None]:
"""Alert skymap as an astropy Table. Currently implemented for LVK schemas only.

This skymap is loaded from the alert to an astropy table and extra columns are added, following
https://emfollow.docs.ligo.org/userguide/tutorial/multiorder_skymaps.html.
The table is sorted by PROBDENSITY and then UNIQ, in descending order, so that the most likely
location is first. Columns:

- UNIQ: HEALPix pixel index in the NUNIQ indexing scheme.
- PROBDENSITY: Probability density in the pixel (per steradian).
- nside: HEALPix nside parameter defining the pixel resolution.
- ipix: HEALPix pixel index at resolution nside.
- ra: Right ascension of the pixel center (radians).
- dec: Declination of the pixel center (radians).
- pixel_area: Area of the pixel (steradians).
- prob: Probability density in the pixel.
- cumprob: Cumulative probability density up to the pixel.

Examples:

.. code-block:: python

# most likely location
alert.skymap[0]

# 90% credible region
alert.skymap[:alert.skymap['cumprob'].searchsorted(0.9)]
"""
if self._skymap is None and self.schema_name.startswith("lvk"):
import astropy.table
import astropy.units
import hpgeom
import numpy as np

skymap = astropy.table.QTable.read(io.BytesIO(base64.b64decode(self.get("skymap"))))
skymap.sort(["PROBDENSITY", "UNIQ"], reverse=True)

skymap["nside"] = (2 ** (np.log2(skymap["UNIQ"] // 4) // 2)).astype(int)
skymap["ipix"] = skymap["UNIQ"] - 4 * skymap["nside"] ** 2

skymap["ra"], skymap["dec"] = hpgeom.pixel_to_angle(
skymap["nside"], skymap["ipix"], degrees=False
)
skymap["ra"].unit = astropy.units.rad
skymap["dec"].unit = astropy.units.rad

skymap["pixel_area"] = hpgeom.nside_to_pixel_area(skymap["nside"], degrees=False)
skymap["pixel_area"].unit = astropy.units.sr

skymap["prob"] = skymap["pixel_area"] * skymap["PROBDENSITY"]
skymap["cumprob"] = skymap["prob"].cumsum()

self._skymap = skymap

return self._skymap

# ---- methods ---- #
def _add_id_attributes(self) -> None:
"""Add the IDs ("alertid", "objectid", "sourceid") to :attr:`Alert.attributes`."""
Expand All @@ -319,10 +418,10 @@ def _add_id_attributes(self) -> None:
# but pubsub message attributes must be strings. join to avoid a future error on publish
names = [".".join(id) if isinstance(id, list) else id for id in survey_names]

# only add to attributes if the survey has defined this field
# only add to attributes if the survey has defined this field and it's not already in the attributes
for idname, idvalue in zip(names, values):
if idname is not None:
self.attributes[idname] = idvalue
if idname is not None and idname not in self._attributes:
self._attributes[idname] = idvalue

def get(self, field: str, default: Any = None) -> Any:
"""Return the value of a field from the alert data.
Expand Down Expand Up @@ -401,3 +500,64 @@ def get_key(
return survey_field[-1]

return survey_field

def _prep_for_publish(self) -> tuple[bytes, Mapping[str, str]]:
"""Serialize the alert dict and convert all attribute keys and values to strings."""
message = self.schema.serialize(self.dict)
# Pub/Sub requires attribute keys and values to be strings. Sort the keys while we're at it.
attributes = {str(key): str(self.attributes[key]) for key in sorted(self.attributes)}
return message, attributes

@staticmethod
def _str_to_datetime(str_time: str) -> datetime.datetime:
# occasionally the string doesn't include microseconds so we need a try/except
try:
return datetime.datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%S.%f%z")
except ValueError:
return datetime.datetime.strptime(str_time, "%Y-%m-%dT%H:%M:%S%z")


@attrs.define
class MockInput:
alert: Alert = attrs.field()

def to_cloud_functions(self) -> tuple[dict, types_._FunctionsContextLike]:
"""

Parameter definitions copied from https://cloud.google.com/functions/1stgendocs/tutorials/pubsub-1st-gen

Returns:
event (dict):
The dictionary with data specific to this type of event. The `@type` field maps to
`type.googleapis.com/google.pubsub.v1.PubsubMessage`. The `data` field maps to the
PubsubMessage data in a base64-encoded string. The `attributes` field maps to the
PubsubMessage attributes if any is present.
context (google.cloud.functions.Context):
Metadata of triggering event including `event_id` which maps to the PubsubMessage
messageId, `timestamp` which maps to the PubsubMessage publishTime, `event_type` which
maps to `google.pubsub.topic.publish`, and `resource` which is a dictionary that
describes the service API endpoint pubsub.googleapis.com, the triggering topic's name,
and the triggering event type `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
"""
message, attributes = self.alert._prep_for_publish()
event_type = "type.googleapis.com/google.pubsub.v1.PubsubMessage"
now = (
datetime.datetime.now(datetime.timezone.utc)
.isoformat(timespec="milliseconds")
.replace("+00:00", "Z")
)

event = {"@type": event_type, "data": base64.b64encode(message), "attributes": attributes}

context = types_._FunctionsContextLike(
event_id=str(int(1e12 * random.random())),
timestamp=now,
event_type="google.pubsub.topic.publish",
resource={
"name": "projects/NONE/topics/NONE",
"service": "pubsub.googleapis.com",
"type": event_type,
},
)

return event, context
5 changes: 1 addition & 4 deletions pittgoogle/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,7 @@ def delete(self) -> None:
def publish(self, alert: "Alert") -> int:
"""Publish a message with :attr:`pittgoogle.Alert.dict` as the payload and
:attr:`pittgoogle.Alert.attributes` as the attributes."""
# Pub/Sub requires attribute keys and values to be strings. Sort the keys while we're at it.
attributes = {str(key): str(alert.attributes[key]) for key in sorted(alert.attributes)}
message = alert.schema.serialize(alert.dict)

message, attributes = alert._prep_for_publish()
future = self.client.publish(self.path, data=message, **attributes)
return future.result()

Expand Down
7 changes: 7 additions & 0 deletions pittgoogle/registry_manifests/schemas.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
5: ''
6: ''
#
# LVK alerts
- name: 'lvk'
description: 'Schema for LIGO-Virgo-KAGRA (LVK) alerts. JSON format.'
origin: 'https://emfollow.docs.ligo.org/userguide/content.html#kafka-notice-gcn-scimma'
helper: 'default_schema_helper'
path: null
#
# ZTF alerts
- name: 'ztf'
description: 'ZTF schema. The ZTF survey publishes alerts in Avro format with the schema attached in the header. Pitt-Google publishes ZTF alerts in json format. This schema covers both cases.'
Expand Down
1 change: 1 addition & 0 deletions pittgoogle/schemas/maps/TEMPLATE.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ mag_zp: [diaSource, magzpsci] # float (magnitude zero point)
mjd: [diaSource, midPointTai] # float
ra: [diaSource, ra] # float
ra_err: [diaSource, raErr] # float
skymap: [event, skymap] # str
snr: [diaSource, snr] # float
13 changes: 13 additions & 0 deletions pittgoogle/schemas/maps/lvk.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
SURVEY: lvk
SCHEMA_ORIGIN: 'https://emfollow.docs.ligo.org/userguide/content.html#kafka-notice-gcn-scimma'
#
# IDs.
alertid: superevent_id # str
sourceid: superevent_id # str
#
# Sources and Objects
source: event # record
#
# Everything else.
skymap: [event, skymap]
# [TODO] What else would be useful?
31 changes: 31 additions & 0 deletions pittgoogle/types_.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,34 @@ class PubsubMessageLike:
"""Timestamp of the published message."""
ordering_key: str | None = attrs.field(default=None)
"""Pub/Sub ordering key of the published message."""


@attrs.define(frozen=True)
class _FunctionsContextLike:
"""Container that mimics the "context" metadata of a Cloud Functions triggering event.

Parameters definitions copied from https://cloud.google.com/functions/1stgendocs/tutorials/pubsub-1st-gen

Parameters;
event_id (str):
Maps to PubsubMessage 'messageId'.
timestamp (str):
Maps to PubsubMessage 'publishTime'.
event_type (str):
Maps to `google.pubsub.topic.publish`.
resource (dict):
Describes the service API endpoint pubsub.googleapis.com, the triggering
topic's name, and the triggering event type
`type.googleapis.com/google.pubsub.v1.PubsubMessage`.
"""

event_id: str
timestamp: str
event_type: str = attrs.field(default="google.pubsub.topic.publish")
resource: dict = attrs.field(
default={
"name": "projects/mock-project/topics/mock-topic",
"service": "pubsub.googleapis.com",
"type": "type.googleapis.com/google.pubsub.v1.PubsubMessage",
}
)
Loading