diff --git a/CHANGELOG.md b/CHANGELOG.md index aa98751..0808c09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## \[Unreleased\] -(none) + + +### Changed + +- Renamed `exceptions.PubSubInvalid` -> `exceptions.CloudConnectionError`, repurposed for more general use. ## \[v0.3.5\] - 2024-07-01 diff --git a/pittgoogle/exceptions.py b/pittgoogle/exceptions.py index f60b710..e8a427a 100644 --- a/pittgoogle/exceptions.py +++ b/pittgoogle/exceptions.py @@ -3,12 +3,13 @@ class BadRequest(Exception): """Raised when a Flask request json envelope (e.g., from Cloud Run) is invalid.""" +class CloudConnectionError(Exception): + """Raised when a problem is encountered while trying to a Google Cloud resource.""" + + class OpenAlertError(Exception): """Raised when unable to deserialize a Pub/Sub message payload.""" -class PubSubInvalid(Exception): - """Raised when an invalid Pub/Sub configuration is encountered.""" - class SchemaNotFoundError(Exception): """Raised when a schema with a given name is not found in the registry.""" diff --git a/pittgoogle/pubsub.py b/pittgoogle/pubsub.py index 1bec5b0..48c4638 100644 --- a/pittgoogle/pubsub.py +++ b/pittgoogle/pubsub.py @@ -23,9 +23,10 @@ from attrs.validators import gt, instance_of, is_callable, optional from google.api_core.exceptions import NotFound +# [FIXME] clean up these imports +from . import exceptions from .alert import Alert from .auth import Auth -from .exceptions import PubSubInvalid, SchemaNotFoundError LOGGER = logging.getLogger(__name__) PACKAGE_DIR = importlib.resources.files(__package__) @@ -67,9 +68,13 @@ def pull_batch( if isinstance(subscription, str): subscription = Subscription(subscription, **subscription_kwargs) - response = subscription.client.pull( - {"subscription": subscription.path, "max_messages": max_messages} - ) + try: + response = subscription.client.pull( + {"subscription": subscription.path, "max_messages": max_messages} + ) + except NotFound: + msg = "Subscription not found. You may need to create one using `pittgoogle.Subscription.touch`." + raise exceptions.CloudConnectionError(msg) alerts = [ Alert.from_msg(msg.message, schema_name=schema_name) for msg in response.received_messages @@ -235,7 +240,7 @@ def publish(self, alert: "Alert") -> int: # schema exists so we can be lenient and just fall back to json instead of raising an error. try: alert.schema - except SchemaNotFoundError: + except exceptions.SchemaNotFoundError: avro_schema = None else: if alert.schema.survey in ["elasticc"]: @@ -345,7 +350,7 @@ def touch(self) -> None: `google.api_core.exceptions.NotFound` if the subscription needs to be created but the topic does not exist in Google Cloud. - `pittgoogle.exceptions.PubSubInvalid` + `pittgoogle.exceptions.CloudConnectionError` if the subscription exists but it is not attached to self.topic and self.topic is not None. """ try: @@ -353,10 +358,10 @@ def touch(self) -> None: LOGGER.info(f"subscription exists: {self.path}") except NotFound: - subscrip = self._create() # may raise TypeError or NotFound + subscrip = self._create() # may raise TypeError or (topic) NotFound LOGGER.info(f"subscription created: {self.path}") - self._set_topic(subscrip.topic) # may raise PubSubInvalid + self._set_topic(subscrip.topic) # may raise CloudConnectionError def _create(self) -> pubsub_v1.types.Subscription: if self.topic is None: @@ -377,10 +382,9 @@ def _set_topic(self, connected_topic_path) -> None: "The subscription exists but is attached to a different topic.\n" f"\tFound topic: {connected_topic_path}\n" f"\tExpected topic: {self.topic.path}\n" - "Either point to the found topic using a keyword argument or" - "delete the existing subscription and try again." + "Either point to the found topic or delete the existing subscription and try again." ) - raise PubSubInvalid(msg) + raise exceptions.CloudConnectionError(msg) # if the topic isn't already set, do it now if self.topic is None: @@ -410,6 +414,7 @@ def pull_batch(self, max_messages: int = 1) -> List["Alert"]: max_messages : `int` Maximum number of messages to be pulled. """ + # Wrapping the module-level function return pull_batch(self, max_messages=max_messages, schema_name=self.schema_name) def purge(self):