From 99d07b519bbfd499b61d06d91020d0668fcb365f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aur=C3=A9lien=20Bompard?= Date: Fri, 16 Aug 2024 21:58:09 +0200 Subject: [PATCH] Make sending messages async and more robust MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Aurélien Bompard --- README.md | 4 +-- poetry.lock | 13 +++++++- pyproject.toml | 3 +- tests/test_message/test_message_create.py | 33 ++++++++++++++----- .../endpoints/message.py | 17 ++++++++-- webhook_to_fedora_messaging/publishing.py | 32 ++++++++++++++++++ 6 files changed, 87 insertions(+), 15 deletions(-) create mode 100644 webhook_to_fedora_messaging/publishing.py diff --git a/README.md b/README.md index 521e241..dc66075 100644 --- a/README.md +++ b/README.md @@ -74,12 +74,12 @@ Example output ``` Usage: w2fm [OPTIONS] COMMAND [ARGS]... - + Options: -c, --conf PATH Read configuration from the specified module --version Show the version and exit. --help Show this message and exit. - + Commands: setup Setup the database schema in the specified environment ``` diff --git a/poetry.lock b/poetry.lock index d60d9e0..4edb3f1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -231,6 +231,17 @@ files = [ [package.extras] dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"] +[[package]] +name = "backoff" +version = "2.2.1" +description = "Function decoration for backoff and retry" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, + {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, +] + [[package]] name = "black" version = "24.8.0" @@ -3262,4 +3273,4 @@ deploy = ["asyncpg", "uvicorn"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "b48d8ad780db87a31efa2851d4bc9e70fd5bd4e735149efc852b73fee8a3d55a" +content-hash = "3e8a2ed921dc7b355863c5ce9ab048874790625a4e1bc461dbf26b4b7fc96d3a" diff --git a/pyproject.toml b/pyproject.toml index 21daa3f..c1082b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,7 @@ authlib = "^1.3.1" itsdangerous = "^2.2.0" httpx = "^0.27.0" pyyaml = "^6.0.2" +backoff = "^2.2.1" [tool.poetry.group.dev.dependencies] @@ -109,7 +110,7 @@ source = ["webhook_to_fedora_messaging"] source = ["webhook_to_fedora_messaging"] [tool.coverage.report] -fail_under = 78 +fail_under = 77 exclude_lines = [ "pragma: no cover", "if __name__ == .__main__.:", diff --git a/tests/test_message/test_message_create.py b/tests/test_message/test_message_create.py index 471a488..ea1d2dc 100644 --- a/tests/test_message/test_message_create.py +++ b/tests/test_message/test_message_create.py @@ -6,7 +6,7 @@ from unittest import mock import pytest -from fedora_messaging.testing import mock_sends +from twisted.internet import defer from webhook_to_fedora_messaging_messages.github import GithubMessageV1 @@ -47,16 +47,33 @@ def fasjson_client(): yield client -async def test_message_create(client, db_service, request_data, request_headers, fasjson_client): +@pytest.fixture +def sent_messages(): + sent = [] + + def _add_and_return(message, exchange=None): + sent.append(message) + return defer.succeed(None) + + with mock.patch( + "webhook_to_fedora_messaging.publishing.api.twisted_publish", side_effect=_add_and_return + ): + yield sent + + +async def test_message_create( + client, db_service, request_data, request_headers, fasjson_client, sent_messages +): fasjson_client.search.return_value = SimpleNamespace( result=[{"username": "dummy-fas-username"}] ) - with mock_sends(GithubMessageV1) as sent_msgs: - response = await client.post( - f"/api/v1/messages/{db_service.uuid}", content=request_data, headers=request_headers - ) - assert response.status_code == 202, response.text - sent_msg = sent_msgs[0] + response = await client.post( + f"/api/v1/messages/{db_service.uuid}", content=request_data, headers=request_headers + ) + assert response.status_code == 202, response.text + assert len(sent_messages) == 1 + sent_msg = sent_messages[0] + assert isinstance(sent_msg, GithubMessageV1) assert sent_msg.topic == "github.push" assert sent_msg.agent_name == "dummy-fas-username" assert sent_msg.body["body"] == json.loads(request_data) diff --git a/webhook_to_fedora_messaging/endpoints/message.py b/webhook_to_fedora_messaging/endpoints/message.py index 2019ec8..f7d3bb1 100644 --- a/webhook_to_fedora_messaging/endpoints/message.py +++ b/webhook_to_fedora_messaging/endpoints/message.py @@ -1,15 +1,20 @@ +import logging + from fastapi import APIRouter, Depends, HTTPException, Request -from fedora_messaging import api -from starlette.status import HTTP_202_ACCEPTED, HTTP_400_BAD_REQUEST +from fedora_messaging import exceptions as fm_exceptions +from starlette.status import HTTP_202_ACCEPTED, HTTP_400_BAD_REQUEST, HTTP_502_BAD_GATEWAY from webhook_to_fedora_messaging.endpoints.models.message import MessageResult from webhook_to_fedora_messaging.endpoints.util import return_service_from_uuid from webhook_to_fedora_messaging.exceptions import SignatureMatchError from webhook_to_fedora_messaging.models import Service +from webhook_to_fedora_messaging.publishing import publish from .parser import parser +logger = logging.getLogger(__name__) + router = APIRouter(prefix="/messages") @@ -34,6 +39,12 @@ async def create_message( HTTP_400_BAD_REQUEST, f"Message could not be dispatched - {expt}" ) from expt - api.publish(message) + try: + await publish(message) + except (fm_exceptions.ConnectionException, fm_exceptions.PublishException) as expt: + logger.exception( + "Could not send message %s for service %s (%s)", message.id, service.name, service.id + ) + raise HTTPException(HTTP_502_BAD_GATEWAY, f"Message could not be sent: {expt}") from expt service.sent += 1 return {"data": {"message_id": message.id}} diff --git a/webhook_to_fedora_messaging/publishing.py b/webhook_to_fedora_messaging/publishing.py new file mode 100644 index 0000000..f64aca9 --- /dev/null +++ b/webhook_to_fedora_messaging/publishing.py @@ -0,0 +1,32 @@ +import asyncio +import logging +import sys +import traceback + +import backoff +from fedora_messaging import api +from fedora_messaging import exceptions as fm_exceptions + + +log = logging.getLogger(__name__) + + +def backoff_hdlr(details): + log.warning("Publishing message failed. Retrying. %s", traceback.format_tb(sys.exc_info()[2])) + + +def giveup_hdlr(details): + log.error("Publishing message failed. Giving up. %s", traceback.format_tb(sys.exc_info()[2])) + + +@backoff.on_exception( + backoff.expo, + (fm_exceptions.ConnectionException, fm_exceptions.PublishException), + max_tries=3, + on_backoff=backoff_hdlr, + on_giveup=giveup_hdlr, +) +async def publish(message): + deferred = api.twisted_publish(message) + loop = asyncio.get_running_loop() + await deferred.asFuture(loop)