diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 3046be99..0c10d0c8 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,6 +7,7 @@ TBD - Add support to change the Exchange Type for RabbitMQ. Default is 'topic', but options like 'fanout' can now be supported +- Better handling of Pika errors 3.22.0 ------ diff --git a/brewtils/pika.py b/brewtils/pika.py index 2a6b6b7b..a8b59fad 100644 --- a/brewtils/pika.py +++ b/brewtils/pika.py @@ -14,7 +14,7 @@ SSLOptions, URLParameters, ) -from pika.exceptions import AMQPError +from pika.exceptions import AMQPError, ConnectionWrongStateError from pika.spec import PERSISTENT_DELIVERY_MODE from brewtils.errors import DiscardMessageException, RepublishRequestException @@ -305,26 +305,29 @@ def run(self): None """ while not self._panic_event.is_set(): - self._connection = self.open_connection() - self._connection.ioloop.start() - - if not self._panic_event.is_set(): - if 0 <= self._max_reconnect_attempts <= self._reconnect_attempt: - self.logger.warning("Max connection failures, shutting down") - self._panic_event.set() - return - - self.logger.warning( - "%s consumer has died, waiting %i seconds before reconnecting", - self._queue_name, - self._reconnect_timeout, - ) - self._panic_event.wait(self._reconnect_timeout) + try: + self._connection = self.open_connection() + self._connection.ioloop.start() + + if not self._panic_event.is_set(): + if 0 <= self._max_reconnect_attempts <= self._reconnect_attempt: + self.logger.warning("Max connection failures, shutting down") + self._panic_event.set() + return + + self.logger.warning( + "%s consumer has died, waiting %i seconds before reconnecting", + self._queue_name, + self._reconnect_timeout, + ) + self._panic_event.wait(self._reconnect_timeout) - self._reconnect_attempt += 1 - self._reconnect_timeout = min( - self._reconnect_timeout * 2, self._max_reconnect_timeout - ) + self._reconnect_attempt += 1 + self._reconnect_timeout = min( + self._reconnect_timeout * 2, self._max_reconnect_timeout + ) + except ConnectionWrongStateError as ex: + self.logger.error("Error running consumer IO-loop: %s", ex) def stop(self): """Cleanly shutdown @@ -566,7 +569,10 @@ def on_connection_closed(self, connection, *args): def open_channel(self): """Open a channel""" self.logger.debug("Opening a new channel") - self._connection.channel(on_open_callback=self.on_channel_open) + try: + self._connection.channel(on_open_callback=self.on_channel_open) + except ConnectionWrongStateError as ex: + self.logger.error("Failure opening channel to consume messages: %s", ex) def on_channel_open(self, channel): """Channel open success callback