Skip to content

Commit

Permalink
rename PubSubInvalid -> CloudConnectionError
Browse files Browse the repository at this point in the history
  • Loading branch information
troyraen committed Jul 1, 2024
1 parent 4572e27 commit f7118ba
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)

## \[Unreleased\]

(none)
<!-- (none) -->

### Changed

- Renamed `exceptions.PubSubInvalid` -> `exceptions.CloudConnectionError`, repurposed for more general use.

## \[v0.3.5\] - 2024-07-01

Expand Down
7 changes: 4 additions & 3 deletions pittgoogle/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ class BadRequest(Exception):
"""Raised when a Flask request json envelope (e.g., from Cloud Run) is invalid."""


class CloudConnectionError(Exception):
"""Raised when a problem is encountered while trying to a Google Cloud resource."""


class OpenAlertError(Exception):
"""Raised when unable to deserialize a Pub/Sub message payload."""


class PubSubInvalid(Exception):
"""Raised when an invalid Pub/Sub configuration is encountered."""

class SchemaNotFoundError(Exception):
"""Raised when a schema with a given name is not found in the registry."""
27 changes: 16 additions & 11 deletions pittgoogle/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
from attrs.validators import gt, instance_of, is_callable, optional
from google.api_core.exceptions import NotFound

# [FIXME] clean up these imports
from . import exceptions
from .alert import Alert
from .auth import Auth
from .exceptions import PubSubInvalid, SchemaNotFoundError

LOGGER = logging.getLogger(__name__)
PACKAGE_DIR = importlib.resources.files(__package__)
Expand Down Expand Up @@ -67,9 +68,13 @@ def pull_batch(
if isinstance(subscription, str):
subscription = Subscription(subscription, **subscription_kwargs)

response = subscription.client.pull(
{"subscription": subscription.path, "max_messages": max_messages}
)
try:
response = subscription.client.pull(
{"subscription": subscription.path, "max_messages": max_messages}
)
except NotFound:
msg = "Subscription not found. You may need to create one using `pittgoogle.Subscription.touch`."
raise exceptions.CloudConnectionError(msg)

alerts = [
Alert.from_msg(msg.message, schema_name=schema_name) for msg in response.received_messages
Expand Down Expand Up @@ -235,7 +240,7 @@ def publish(self, alert: "Alert") -> int:
# schema exists so we can be lenient and just fall back to json instead of raising an error.
try:
alert.schema
except SchemaNotFoundError:
except exceptions.SchemaNotFoundError:
avro_schema = None
else:
if alert.schema.survey in ["elasticc"]:
Expand Down Expand Up @@ -345,18 +350,18 @@ def touch(self) -> None:
`google.api_core.exceptions.NotFound`
if the subscription needs to be created but the topic does not exist in Google Cloud.
`pittgoogle.exceptions.PubSubInvalid`
`pittgoogle.exceptions.CloudConnectionError`
if the subscription exists but it is not attached to self.topic and self.topic is not None.
"""
try:
subscrip = self.client.get_subscription(subscription=self.path)
LOGGER.info(f"subscription exists: {self.path}")

except NotFound:
subscrip = self._create() # may raise TypeError or NotFound
subscrip = self._create() # may raise TypeError or (topic) NotFound
LOGGER.info(f"subscription created: {self.path}")

self._set_topic(subscrip.topic) # may raise PubSubInvalid
self._set_topic(subscrip.topic) # may raise CloudConnectionError

def _create(self) -> pubsub_v1.types.Subscription:
if self.topic is None:
Expand All @@ -377,10 +382,9 @@ def _set_topic(self, connected_topic_path) -> None:
"The subscription exists but is attached to a different topic.\n"
f"\tFound topic: {connected_topic_path}\n"
f"\tExpected topic: {self.topic.path}\n"
"Either point to the found topic using a keyword argument or"
"delete the existing subscription and try again."
"Either point to the found topic or delete the existing subscription and try again."
)
raise PubSubInvalid(msg)
raise exceptions.CloudConnectionError(msg)

# if the topic isn't already set, do it now
if self.topic is None:
Expand Down Expand Up @@ -410,6 +414,7 @@ def pull_batch(self, max_messages: int = 1) -> List["Alert"]:
max_messages : `int`
Maximum number of messages to be pulled.
"""
# Wrapping the module-level function
return pull_batch(self, max_messages=max_messages, schema_name=self.schema_name)

def purge(self):
Expand Down

0 comments on commit f7118ba

Please sign in to comment.