Skip to content

Commit

Permalink
Make sending messages async and more robust
Browse files Browse the repository at this point in the history
Signed-off-by: Aurélien Bompard <[email protected]>
  • Loading branch information
abompard committed Aug 16, 2024
1 parent 6512427 commit 99d07b5
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 15 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@
Example output
```
Usage: w2fm [OPTIONS] COMMAND [ARGS]...
Options:
-c, --conf PATH Read configuration from the specified module
--version Show the version and exit.
--help Show this message and exit.
Commands:
setup Setup the database schema in the specified environment
```
Expand Down
13 changes: 12 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ authlib = "^1.3.1"
itsdangerous = "^2.2.0"
httpx = "^0.27.0"
pyyaml = "^6.0.2"
backoff = "^2.2.1"


[tool.poetry.group.dev.dependencies]
Expand Down Expand Up @@ -109,7 +110,7 @@ source = ["webhook_to_fedora_messaging"]
source = ["webhook_to_fedora_messaging"]

[tool.coverage.report]
fail_under = 78
fail_under = 77
exclude_lines = [
"pragma: no cover",
"if __name__ == .__main__.:",
Expand Down
33 changes: 25 additions & 8 deletions tests/test_message/test_message_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from unittest import mock

import pytest
from fedora_messaging.testing import mock_sends
from twisted.internet import defer
from webhook_to_fedora_messaging_messages.github import GithubMessageV1


Expand Down Expand Up @@ -47,16 +47,33 @@ def fasjson_client():
yield client


async def test_message_create(client, db_service, request_data, request_headers, fasjson_client):
@pytest.fixture
def sent_messages():
sent = []

def _add_and_return(message, exchange=None):
sent.append(message)
return defer.succeed(None)

with mock.patch(
"webhook_to_fedora_messaging.publishing.api.twisted_publish", side_effect=_add_and_return
):
yield sent


async def test_message_create(
client, db_service, request_data, request_headers, fasjson_client, sent_messages
):
fasjson_client.search.return_value = SimpleNamespace(
result=[{"username": "dummy-fas-username"}]
)
with mock_sends(GithubMessageV1) as sent_msgs:
response = await client.post(
f"/api/v1/messages/{db_service.uuid}", content=request_data, headers=request_headers
)
assert response.status_code == 202, response.text
sent_msg = sent_msgs[0]
response = await client.post(
f"/api/v1/messages/{db_service.uuid}", content=request_data, headers=request_headers
)
assert response.status_code == 202, response.text
assert len(sent_messages) == 1
sent_msg = sent_messages[0]
assert isinstance(sent_msg, GithubMessageV1)
assert sent_msg.topic == "github.push"
assert sent_msg.agent_name == "dummy-fas-username"
assert sent_msg.body["body"] == json.loads(request_data)
Expand Down
17 changes: 14 additions & 3 deletions webhook_to_fedora_messaging/endpoints/message.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import logging

from fastapi import APIRouter, Depends, HTTPException, Request
from fedora_messaging import api
from starlette.status import HTTP_202_ACCEPTED, HTTP_400_BAD_REQUEST
from fedora_messaging import exceptions as fm_exceptions
from starlette.status import HTTP_202_ACCEPTED, HTTP_400_BAD_REQUEST, HTTP_502_BAD_GATEWAY

from webhook_to_fedora_messaging.endpoints.models.message import MessageResult
from webhook_to_fedora_messaging.endpoints.util import return_service_from_uuid
from webhook_to_fedora_messaging.exceptions import SignatureMatchError
from webhook_to_fedora_messaging.models import Service
from webhook_to_fedora_messaging.publishing import publish

from .parser import parser


logger = logging.getLogger(__name__)

router = APIRouter(prefix="/messages")


Expand All @@ -34,6 +39,12 @@ async def create_message(
HTTP_400_BAD_REQUEST, f"Message could not be dispatched - {expt}"
) from expt

api.publish(message)
try:
await publish(message)
except (fm_exceptions.ConnectionException, fm_exceptions.PublishException) as expt:
logger.exception(
"Could not send message %s for service %s (%s)", message.id, service.name, service.id
)
raise HTTPException(HTTP_502_BAD_GATEWAY, f"Message could not be sent: {expt}") from expt
service.sent += 1
return {"data": {"message_id": message.id}}
32 changes: 32 additions & 0 deletions webhook_to_fedora_messaging/publishing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import asyncio
import logging
import sys
import traceback

import backoff
from fedora_messaging import api
from fedora_messaging import exceptions as fm_exceptions


log = logging.getLogger(__name__)


def backoff_hdlr(details):
log.warning("Publishing message failed. Retrying. %s", traceback.format_tb(sys.exc_info()[2]))


def giveup_hdlr(details):
log.error("Publishing message failed. Giving up. %s", traceback.format_tb(sys.exc_info()[2]))


@backoff.on_exception(
backoff.expo,
(fm_exceptions.ConnectionException, fm_exceptions.PublishException),
max_tries=3,
on_backoff=backoff_hdlr,
on_giveup=giveup_hdlr,
)
async def publish(message):
deferred = api.twisted_publish(message)
loop = asyncio.get_running_loop()
await deferred.asFuture(loop)

0 comments on commit 99d07b5

Please sign in to comment.