Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create ps_to_storage Cloud Run service for LSST #250

Open
wants to merge 39 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e529c1e
create deployment script
hernandezc1 Jan 27, 2025
7fb5987
update deploy.sh
hernandezc1 Jan 30, 2025
1bffc52
move deploy.sh to the correct directory
hernandezc1 Jan 30, 2025
7704f40
create requirements.txt
hernandezc1 Jan 30, 2025
510b1ea
create exceptions.py (to be updated later)
hernandezc1 Jan 30, 2025
e339ef5
create main.py (to be updated; copied from what's in production)
hernandezc1 Jan 30, 2025
25d71ba
rename /setup_broker/rubin directory to lsst
hernandezc1 Jan 30, 2025
4e5fe78
update README to reflect rubin -> lsst name change
hernandezc1 Jan 30, 2025
54d3a4e
add functions to setup_broker script
hernandezc1 Jan 30, 2025
4227e52
rename rubin directory to lsst
hernandezc1 Feb 7, 2025
893ecf2
update scripts
hernandezc1 Feb 7, 2025
0cfacac
update main.py and create_vm script
hernandezc1 Feb 10, 2025
a8d6216
update deployment of cloud function
hernandezc1 Feb 11, 2025
d9de71d
fixes issue preventing alerts being published to a Pub/Sub topic
hernandezc1 Feb 13, 2025
c0aaf8a
update comments to main.py
hernandezc1 Feb 13, 2025
6cb6fcd
Merge branch 'develop' into u/ch/lsst/pstogcs
hernandezc1 Feb 17, 2025
6bbc1e2
remove unused variable
hernandezc1 Feb 17, 2025
3f4013a
significant updates to main.py
hernandezc1 Feb 17, 2025
4227307
address codacy issues
hernandezc1 Feb 17, 2025
98af2ed
update comment
hernandezc1 Feb 17, 2025
bdd7822
update attributes
hernandezc1 Feb 18, 2025
7471e94
Merge branch 'develop' into u/ch/lsst/pstogcs
hernandezc1 Feb 19, 2025
aaa52c7
update comments in deploy.sh
hernandezc1 Feb 20, 2025
baec6e5
specify consumer/lsst dir in upload_broker_bucket.sh
hernandezc1 Feb 20, 2025
c70b38d
apply suggested changes to main.py
hernandezc1 Feb 20, 2025
3e5f3c8
apply suggested changes to vm_startup
hernandezc1 Feb 20, 2025
4799852
create a single "alerts" bucket with a top-level dir for each schema …
hernandezc1 Feb 20, 2025
1d1c5a1
remove 'counter' subscriptions
hernandezc1 Feb 20, 2025
e75ef3a
update README in consumer/lsst
hernandezc1 Feb 20, 2025
f2a6769
add BigQuery table schema for LSST alerts
hernandezc1 Feb 20, 2025
19f1742
update files in setup_broker/lsst
hernandezc1 Feb 20, 2025
78a8951
change ps_to_gcs to ps_to_storage
hernandezc1 Feb 20, 2025
6bb9a7c
update scripts to accomodate BQ subscriptions
hernandezc1 Feb 21, 2025
bc3b283
fix bugs
hernandezc1 Feb 21, 2025
5c8ae67
update misnamed parameter
hernandezc1 Feb 21, 2025
5f18d29
update startupscript for vm_install
hernandezc1 Feb 21, 2025
d008aea
delete bq table in setup_broker
hernandezc1 Feb 21, 2025
dae6ae0
add if block to setup_broker
hernandezc1 Feb 21, 2025
ab2a937
update startup/shutdown scripts in create_vm
hernandezc1 Feb 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions broker/cloud_functions/lsst/ps_to_storage/deploy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#! /bin/bash
# Deploys or deletes broker Cloud Functions
# This script will not delete Cloud Functions that are in production

# "False" uses production resources
# any other string will be appended to the names of all resources
testid="${1:-test}"
# "True" tearsdown/deletes resources, else setup
teardown="${2:-False}"
# name of the survey this broker instance will ingest
survey="${3:-lsst}"
# schema version
versiontag="${4:-v7_3}"
region="${5:-us-central1}"
PROJECT_ID=$GOOGLE_CLOUD_PROJECT # get the environment variable

# function used to define GCP resources; appends testid if needed
define_GCP_resources() {
local base_name="$1"
local testid_suffix=""

if [ "$testid" != "False" ]; then
testid_suffix="-${testid}"
fi

echo "${base_name}${testid_suffix}"
}

#--- GCP resources used in this script
avro_bucket=$(define_GCP_resources "${PROJECT_ID}-${survey}_alerts")
avro_topic=$(define_GCP_resources "projects/${PROJECT_ID}/topics/${survey}-alert_avros")
avro_subscription=$(define_GCP_resources "${survey}-alert_avros-counter")
ps_to_storage_trigger_topic=$(define_GCP_resources "${survey}-alerts_raw")
ps_to_storage_CF_name=$(define_GCP_resources "${survey}-alerts_to_storage")

if [ "${teardown}" = "True" ]; then
# ensure that we do not teardown production resources
if [ "${testid}" != "False" ]; then
gsutil rm -r "gs://${avro_bucket}"
gcloud pubsub topics delete "${avro_topic}"
gcloud pubsub subscriptions delete "${avro_subscription}"
gcloud functions delete "${ps_to_storage_CF_name}"
fi

else # Deploy the Cloud Functions

#--- Create the bucket that will store the alerts
gsutil mb -l "${region}" "gs://${avro_bucket}"
gsutil uniformbucketlevelaccess set on "gs://${avro_bucket}"
gsutil requesterpays set on "gs://${avro_bucket}"
gcloud storage buckets add-iam-policy-binding "gs://${avro_bucket}" \
--member="allUsers" \
--role="roles/storage.objectViewer"

#--- Setup the Pub/Sub notifications on the Avro storage bucket
echo
echo "Configuring Pub/Sub notifications on GCS bucket..."
trigger_event=OBJECT_FINALIZE
format=json # json or none; if json, file metadata sent in message body
gsutil notification create \
-t "$avro_topic" \
-e "$trigger_event" \
-f "$format" \
"gs://${avro_bucket}"
gcloud pubsub subscriptions create "${avro_subscription}" --topic="${avro_topic}"


#--- Pub/Sub -> Cloud Storage Avro cloud function
echo "Deploying Cloud Function: ${ps_to_storage_CF_name}"
ps_to_storage_entry_point="run"
memory=512MB # standard 256MB is too small here

gcloud functions deploy "${ps_to_storage_CF_name}" \
--entry-point "${ps_to_storage_entry_point}" \
--runtime python312 \
--memory "${memory}" \
--trigger-topic "${ps_to_storage_trigger_topic}" \
--set-env-vars TESTID="${testid}",SURVEY="${survey}",VERSIONTAG="${versiontag}",GCP_PROJECT="${PROJECT_ID}"
fi
229 changes: 229 additions & 0 deletions broker/cloud_functions/lsst/ps_to_storage/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-

"""This module stores LSST alert data as an Avro file in Cloud Storage."""

import base64
import io
import json
import math
import os
import struct
from typing import Optional
from astropy.time import Time

import fastavro
import pittgoogle
from confluent_kafka.schema_registry import SchemaRegistryClient
from google.cloud import functions_v1, logging, storage, pubsub_v1
from google.cloud.exceptions import PreconditionFailed


PROJECT_ID = os.getenv("GCP_PROJECT")
TESTID = os.getenv("TESTID")
SURVEY = os.getenv("SURVEY")
VERSIONTAG = os.getenv("VERSIONTAG")

# connect to the cloud logger and publisher
logging_client = logging.Client()
log_name = "ps-to-storage-cloudfnc"
logger = logging_client.logger(log_name)
publisher = pubsub_v1.PublisherClient()

# GCP resources used in this module
ALERTS_TOPIC = pittgoogle.Topic.from_cloud(
"alerts", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
)
TOPIC_BIGQUERY_IMPORT = pittgoogle.Topic.from_cloud(
"bigquery-import", survey=SURVEY, testid=TESTID, projectid=PROJECT_ID
)
bucket_name = f"{PROJECT_ID}-{SURVEY}_alerts"
if TESTID != "False":
bucket_name = f"{bucket_name}-{TESTID}"

client = storage.Client()
bucket = client.get_bucket(client.bucket(bucket_name, user_project=PROJECT_ID))

# define a binary data structure for packing and unpacking bytes
_ConfluentWireFormatHeader = struct.Struct(">bi")


def run(event: dict, context: functions_v1.context.Context) -> None:
"""Entry point for the Cloud Function

For args descriptions, see:
https://cloud.google.com/functions/docs/writing/background#function_parameters

Args:
event: Pub/Sub message data and attributes.
`data` field contains the message data in a base64-encoded string.
`attributes` field contains the message's custom attributes in a dict.

context: The Cloud Function's event metadata.
It has the following attributes:
`event_id`: the Pub/Sub message ID.
`timestamp`: the Pub/Sub message publish time.
`event_type`: for example: "google.pubsub.topic.publish".
`resource`: the resource that emitted the event.
"""
try:
store_alert_data(event, context)
# this is raised by blob.upload_from_file if the object already exists in the bucket
except PreconditionFailed:
# we'll simply pass, and the duplicate alert will go no further in our pipeline
pass


def store_alert_data(event: dict, context: functions_v1.context.Context) -> None:
"""Uploads the msg data bytes to a GCP storage bucket."""

alert_bytes = base64.b64decode(event["data"]) # alert packet, bytes
attributes = event.get("attributes", {})

# unpack the alert and read schema ID
header_bytes = alert_bytes[:5]
schema_id = deserialize_confluent_wire_header(header_bytes)

# get and load schema
sr_client = SchemaRegistryClient({"url": "https://usdf-alert-schemas-dev.slac.stanford.edu"})
schema = sr_client.get_schema(schema_id=schema_id)
parse_schema = json.loads(schema.schema_str)
schema_version = parse_schema["namespace"].split(".")[1]
content_bytes = io.BytesIO(alert_bytes[5:])

# deserialize the alert and create Alert object
alert_dict = fastavro.schemaless_reader(content_bytes, parse_schema)
filename = generate_alert_filename(
{
"schema_version": schema_version,
"objectId": alert_dict["diaObject"]["diaObjectId"],
"sourceId": alert_dict["diaSource"]["diaSourceId"],
"alert_date": alert_dict["diaSource"]["midpointMjdTai"],
"format": "avro",
}
)

blob = bucket.blob(filename)
blob.metadata = create_file_metadata(alert_dict, context)

# raise a PreconditionFailed exception if filename already exists in the bucket using "if_generation_match=0"
# let it raise. the main function will catch it and then drop the message.
blob.upload_from_file(io.BytesIO(alert_bytes), if_generation_match=0)

# Cloud Storage says this is not a duplicate, so now we publish the broker's main "alerts" stream
publish_alerts_stream(
topic_name=ALERTS_TOPIC.name,
message=alert_bytes,
attributes={
"diaObjectId": str(alert_dict["diaObject"]["diaObjectId"]),
"diaSourceId": str(alert_dict["diaSource"]["diaSourceId"]),
"schema_version": schema_version,
**attributes,
},
)

# publish the alert as a JSON message to the bigquery-import topic
TOPIC_BIGQUERY_IMPORT.publish(_create_valid_json(alert_dict, attributes))


def deserialize_confluent_wire_header(raw):
"""Parses the byte prefix for Confluent Wire Format-style Kafka messages.
Parameters
----------
raw : `bytes`
The 5-byte encoded message prefix.
Returns
-------
schema_version : `int`
A version number which indicates the Confluent Schema Registry ID
number of the Avro schema used to encode the message that follows this
header.
"""
_, version = _ConfluentWireFormatHeader.unpack(raw)

return version


def generate_alert_filename(aname: dict) -> str:
"""
Generate the filename of an alert stored to a Cloud Storage bucket.

Args:
aname:
Components to create the filename. Required key/value pairs are those needed to create a parsed filename.
Extra keys are ignored.

Returns:
str: The formatted filename as "{schema_version}/{YYYY-MM-DD}/{objectId}/{sourceId}.{format}".
"""

schema_version = aname["schema_version"]
alert_date = aname["alert_date"]
object_id = aname["objectId"]
source_id = aname["sourceId"]
file_format = aname["format"]

# convert the MJD timestamp to "YYYY-MM-DD"
time_obj = Time(alert_date, format="mjd")
date_string = time_obj.datetime.strftime("%Y-%m-%d")

return f"{schema_version}/{date_string}/{object_id}/{source_id}.{file_format}"


def create_file_metadata(alert_dict: dict, context):
"""Return key/value pairs to be attached to the file as metadata."""

metadata = {"file_origin_message_id": context.event_id}
metadata["diaObjectId"] = alert_dict["diaObject"]["diaObjectId"]
metadata["diaSourceId"] = alert_dict["diaSource"]["diaSourceId"]
metadata["ra"] = alert_dict["diaSource"]["ra"]
metadata["dec"] = alert_dict["diaSource"]["dec"]

return metadata


def publish_alerts_stream(
topic_name: str, message: bytes, attributes: Optional[dict] = None
) -> str:
"""Publish original alert bytes to a Pub/Sub topic."""

# enforce bytes type for message
if not isinstance(message, bytes):
raise TypeError("`message` must be bytes.")

topic_path = publisher.topic_path(PROJECT_ID, topic_name)
future = publisher.publish(topic_path, data=message, **attributes)

return future.result()


def _create_valid_json(alert_dict: dict, attributes: dict) -> pittgoogle.alert.Alert:
"""Transforms alert data to a valid JSON message."""

# define and remove cutouts from message
cutouts = [
"cutoutTemplate",
"cutoutScience",
"cutoutDifference",
]
for key in cutouts:
alert_dict.pop(key, None)

# replace NaN values with None
valid_json = _transform_nan_to_none(alert_dict)

return pittgoogle.Alert.from_dict(payload=valid_json, attributes=attributes)


def _transform_nan_to_none(alert_dict: dict) -> dict:
"""Recursively replace NaN values with None in a dictionary."""

# convert NaN to None
if isinstance(alert_dict, dict):
return {k: _transform_nan_to_none(v) for k, v in alert_dict.items()}
if isinstance(alert_dict, list):
return [_transform_nan_to_none(v) for v in alert_dict]
if isinstance(alert_dict, float) and math.isnan(alert_dict):
return None

return alert_dict
13 changes: 13 additions & 0 deletions broker/cloud_functions/lsst/ps_to_storage/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# As explained here
# https://cloud.google.com/functions/docs/writing/specifying-dependencies-python
# dependencies for a Cloud Function must be specified in a `requirements.txt`
# file (or packaged with the function) in the same directory as `main.py`

confluent-kafka>=2.6.0
fastavro
google-cloud-functions
google-cloud-logging
google-cloud-pubsub
google-cloud-storage
httpx # used by confluent-kafka
pittgoogle-client>=0.3.11
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# Start the Rubin consumer VM
# Start the LSST consumer VM

See `Pitt-Google-Broker/broker/setup_broker/rubin/README.md` for setup instructions.
See `Pitt-Google-Broker/broker/setup_broker/lsst/README.md` for setup instructions.

To start the consumer VM:

```bash
survey="rubin"
survey="lsst"
testid="mytest"
consumerVM="${survey}-consumer-${testid}"
zone="us-central1-a"

# Set the VM metadata
KAFKA_TOPIC="alerts-simulated"
PS_TOPIC="${survey}-alerts-${testid}"
PS_TOPIC="${survey}-alerts_raw-${testid}"
gcloud compute instances add-metadata "${consumerVM}" --zone "${zone}" \
--metadata="PS_TOPIC_FORCE=${PS_TOPIC},KAFKA_TOPIC_FORCE=${KAFKA_TOPIC}"

Expand All @@ -25,7 +25,7 @@ gcloud compute instances start ${consumerVM} --zone ${zone}
To stop stop the consumer VM:

```bash
survey="rubin"
survey="lsst"
testid="mytest"
consumerVM="${survey}-consumer-${testid}"
zone="us-central1-a"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule require

# settings with `consumer.` prefixes are passed through to the Kafka consumer
# for Rubin, consumer.group.id must begin with our username: pittgoogle-idfint
consumer.group.id=pittgoogle-idfint-kafka-pubsub-connector
consumer.group.id=GROUP_ID
consumer.auto.offset.reset=earliest
consumer.sasl.mechanism=SCRAM-SHA-512
consumer.sasl.kerberos.service.name=kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ wget https://github.com/GoogleCloudPlatform/pubsub/releases/download/${CONNECTOR
echo "Done installing the Kafka -> Pub/Sub connector"

#--- Set the startup script and shutdown
startupscript="gs://${broker_bucket}/consumer/${survey}/vm_startup.sh"
startupscript="gs://${broker_bucket}/${survey}/vm_startup.sh"
gcloud compute instances add-metadata "$consumerVM" --zone "$zone" \
--metadata startup-script-url="$startupscript"
echo "vm_install.sh is complete. Shutting down."
shutdown -h now
shutdown -h now
Loading