diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index 6a744e9..8125567 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -53,7 +53,7 @@ def _handle_push_event(self, push_event: Push) -> None: def _handle_event(self, event: Push | Tag) -> None: if event.repo_url not in self._repo_synchronizers: - logger.info("Ignoring event for untracked repository: %()s", event.repo_url) + logger.info(f"Ignoring event for untracked repository: {event.repo_url}") return match event: case Push(): diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py index 1297c6e..5f13958 100644 --- a/git_hg_sync/pulse_worker.py +++ b/git_hg_sync/pulse_worker.py @@ -1,3 +1,4 @@ +import json from typing import Any, Protocol import kombu @@ -34,7 +35,7 @@ def __init__( self.one_shot = one_shot @staticmethod - def parse_entity(raw_entity: Any) -> Push | Tag: + def parse_entity(raw_entity: dict) -> Push | Tag: logger.debug(f"parse_entity: {raw_entity}") message_type = raw_entity.pop("type") match message_type: @@ -53,12 +54,46 @@ def get_consumers( consumer = consumer_class( self.task_queue, auto_declare=False, callbacks=[self.on_task] ) + logger.info(f"{consumer=}") return [consumer] def on_task(self, body: Any, message: kombu.Message) -> None: logger.info(f"Received message: {body}") - raw_entity = body["payload"] - event = PulseWorker.parse_entity(raw_entity) + + if isinstance(body, str): + logger.debug("Message is a string. Trying to parse as JSON ...") + try: + body = json.loads(body) + except json.JSONDecodeError: + pass # We'll deal with the incorrect type next. + if not isinstance(body, dict): + logger.warning(f"Invalid message, rejecting ... `{body}`") + message.reject() + return + + if not (raw_entity := body.get("payload")): + logger.warning(f"Missing or empty payload, rejecting ... `{body}`") + message.reject() + return + + if not isinstance(raw_entity, dict): + logger.warning(f"Invalid payload, rejecting ... `{raw_entity}`") + message.reject() + return + + try: + event = PulseWorker.parse_entity(raw_entity) + except KeyError as e: + logger.warning( + f"Invalid payload: missing {e}, rejecting ... `{raw_entity}`" + ) + message.reject() + return + except (EntityTypeError, TypeError) as e: + logger.warning(f"Invalid payload: {e}, rejecting ... `{raw_entity}`") + message.reject() + return + if self.event_handler: self.event_handler(event) message.ack()