Skip to content

Commit

Permalink
pulse_worker: error handling for payload
Browse files Browse the repository at this point in the history
  • Loading branch information
shtrom committed Feb 4, 2025
1 parent faf69c6 commit 779e040
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
2 changes: 1 addition & 1 deletion git_hg_sync/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _handle_push_event(self, push_event: Push) -> None:

def _handle_event(self, event: Push | Tag) -> None:
if event.repo_url not in self._repo_synchronizers:
logger.info("Ignoring event for untracked repository: %()s", event.repo_url)
logger.info(f"Ignoring event for untracked repository: {event.repo_url}")
return
match event:
case Push():
Expand Down
41 changes: 38 additions & 3 deletions git_hg_sync/pulse_worker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from typing import Any, Protocol

import kombu
Expand Down Expand Up @@ -34,7 +35,7 @@ def __init__(
self.one_shot = one_shot

@staticmethod
def parse_entity(raw_entity: Any) -> Push | Tag:
def parse_entity(raw_entity: dict) -> Push | Tag:
logger.debug(f"parse_entity: {raw_entity}")
message_type = raw_entity.pop("type")
match message_type:
Expand All @@ -53,12 +54,46 @@ def get_consumers(
consumer = consumer_class(
self.task_queue, auto_declare=False, callbacks=[self.on_task]
)
logger.info(f"{consumer=}")
return [consumer]

def on_task(self, body: Any, message: kombu.Message) -> None:
logger.info(f"Received message: {body}")
raw_entity = body["payload"]
event = PulseWorker.parse_entity(raw_entity)

if isinstance(body, str):
logger.debug("Message is a string. Trying to parse as JSON ...")
try:
body = json.loads(body)
except json.JSONDecodeError:
pass # We'll deal with the incorrect type next.
if not isinstance(body, dict):
logger.warning(f"Invalid message, rejecting ... `{body}`")
message.reject()
return

if not (raw_entity := body.get("payload")):
logger.warning(f"Missing or empty payload, rejecting ... `{body}`")
message.reject()
return

if not isinstance(raw_entity, dict):
logger.warning(f"Invalid payload, rejecting ... `{raw_entity}`")
message.reject()
return

try:
event = PulseWorker.parse_entity(raw_entity)
except KeyError as e:
logger.warning(
f"Invalid payload: missing {e}, rejecting ... `{raw_entity}`"
)
message.reject()
return
except (EntityTypeError, TypeError) as e:
logger.warning(f"Invalid payload: {e}, rejecting ... `{raw_entity}`")
message.reject()
return

if self.event_handler:
self.event_handler(event)
message.ack()
Expand Down

0 comments on commit 779e040

Please sign in to comment.