From 0f8f5038e9fafb10b12f783fa44c3134ed99c432 Mon Sep 17 00:00:00 2001 From: Helmy Giacoman Date: Thu, 16 Jan 2025 23:35:59 +0100 Subject: [PATCH] Add async support to ingest handlers SDESK-7472 --- server/features/steps/steps.py | 5 +++-- .../planning/feed_parsers/events_ml_test.py | 12 +++++----- .../superdesk_planning_xml_test.py | 12 +++++----- server/planning/io/ingest_rule_handler.py | 4 ++-- .../planning/io/ingest_rule_handler_test.py | 22 ++++++++++--------- 5 files changed, 29 insertions(+), 26 deletions(-) diff --git a/server/features/steps/steps.py b/server/features/steps/steps.py index b3c3448e2..ac0d2a3c9 100644 --- a/server/features/steps/steps.py +++ b/server/features/steps/steps.py @@ -361,7 +361,8 @@ def then_get_transmitted_item(context, path): @when('we fetch events from "{provider_name}" ingest "{guid}"') -def step_impl_fetch_from_provider_ingest(context, provider_name, guid): +@async_run_until_complete +async def step_impl_fetch_from_provider_ingest(context, provider_name, guid): with context.app.test_request_context(context.app.config["URL_PREFIX"]): ingest_provider_service = get_resource_service("ingest_providers") provider = ingest_provider_service.find_one(name=provider_name, req=None) @@ -382,7 +383,7 @@ def step_impl_fetch_from_provider_ingest(context, provider_name, guid): item["versioncreated"] = utcnow() item["expiry"] = utcnow() + timedelta(minutes=20) - failed = context.ingest_items(items, provider, provider_service) + failed = await context.ingest_items(items, provider, provider_service) assert len(failed) == 0, failed provider = ingest_provider_service.find_one(name=provider_name, req=None) diff --git a/server/planning/feed_parsers/events_ml_test.py b/server/planning/feed_parsers/events_ml_test.py index 04f035d61..c786d69ca 100644 --- a/server/planning/feed_parsers/events_ml_test.py +++ b/server/planning/feed_parsers/events_ml_test.py @@ -268,7 +268,7 @@ async def test_update_event(self): } # Ingest first version - ingested, ids = ingest_item(source, provider=provider, feeding_service={}) + ingested, ids = await ingest_item(source, provider=provider, feeding_service={}) self.assertTrue(ingested) self.assertIn(source["guid"], ids) dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0] @@ -279,12 +279,12 @@ async def test_update_event(self): source["versioncreated"] = source["ingest_versioncreated"] source["name"] = "Test name" provider["disable_item_updates"] = True - ingested, ids = ingest_item(source, provider=provider, feeding_service={}) + ingested, ids = await ingest_item(source, provider=provider, feeding_service={}) self.assertFalse(ingested) # Attempt to update with a new version provider.pop("disable_item_updates") - ingested, ids = ingest_item(source, provider=provider, feeding_service={}) + ingested, ids = await ingest_item(source, provider=provider, feeding_service={}) self.assertTrue(ingested) self.assertIn(source["guid"], ids) dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0] @@ -306,7 +306,7 @@ async def test_update_published_event(self): } # Ingest first version - ingest_item(source, provider=provider, feeding_service={}) + await ingest_item(source, provider=provider, feeding_service={}) # Publish the Event service.patch( @@ -326,7 +326,7 @@ async def test_update_published_event(self): # Ingest a new version of the item, and make sure the item is re-published source = deepcopy(original_source) source["versioncreated"] += timedelta(hours=1) - ingest_item(source, provider=provider, feeding_service={}) + await ingest_item(source, provider=provider, feeding_service={}) self.assertEqual(published_service.get(req=None, lookup={"item_id": source["guid"]}).count(), 2) dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0] @@ -338,7 +338,7 @@ async def test_update_published_event(self): source = deepcopy(original_source) source["versioncreated"] += timedelta(hours=2) source["pubstatus"] = POST_STATE.CANCELLED - ingest_item(source, provider=provider, feeding_service={}) + await ingest_item(source, provider=provider, feeding_service={}) self.assertEqual(published_service.get(req=None, lookup={"item_id": source["guid"]}).count(), 3) dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0] diff --git a/server/planning/feed_parsers/superdesk_planning_xml_test.py b/server/planning/feed_parsers/superdesk_planning_xml_test.py index 698aedee2..ff6dbe717 100644 --- a/server/planning/feed_parsers/superdesk_planning_xml_test.py +++ b/server/planning/feed_parsers/superdesk_planning_xml_test.py @@ -152,7 +152,7 @@ async def test_update_planning(self): } # Ingest first version - ingested, ids = ingest_item(source, provider=provider, feeding_service={}) + ingested, ids = await ingest_item(source, provider=provider, feeding_service={}) self.assertTrue(ingested) self.assertIn(source["guid"], ids) dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0] @@ -166,12 +166,12 @@ async def test_update_planning(self): source["versioncreated"] = source["ingest_versioncreated"] source["slugline"] = "Test slugline" provider["disable_item_updates"] = True - ingested, ids = ingest_item(source, provider=provider, feeding_service={}) + ingested, ids = await ingest_item(source, provider=provider, feeding_service={}) self.assertFalse(ingested) # Attempt to update with a new version provider.pop("disable_item_updates") - ingested, ids = ingest_item(source, provider=provider, feeding_service={}) + ingested, ids = await ingest_item(source, provider=provider, feeding_service={}) self.assertTrue(ingested) self.assertIn(source["guid"], ids) dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0] @@ -194,7 +194,7 @@ async def test_update_published_planning(self): } # Ingest first version - ingested, ids = ingest_item(source, provider=provider, feeding_service={}) + ingested, ids = await ingest_item(source, provider=provider, feeding_service={}) self.assertTrue(ingested) self.assertIn(source["guid"], ids) @@ -216,7 +216,7 @@ async def test_update_published_planning(self): # Ingest a new version of the item, and make sure the item is re-published source = deepcopy(original_source) source["versioncreated"] += timedelta(hours=1) - ingest_item(source, provider=provider, feeding_service={}) + await ingest_item(source, provider=provider, feeding_service={}) self.assertEqual(published_service.get(req=None, lookup={"item_id": source["guid"]}).count(), 2) dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0] @@ -228,7 +228,7 @@ async def test_update_published_planning(self): source = deepcopy(original_source) source["versioncreated"] += timedelta(hours=2) source["pubstatus"] = POST_STATE.CANCELLED - ingest_item(source, provider=provider, feeding_service={}) + await ingest_item(source, provider=provider, feeding_service={}) self.assertEqual(published_service.get(req=None, lookup={"item_id": source["guid"]}).count(), 3) dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0] diff --git a/server/planning/io/ingest_rule_handler.py b/server/planning/io/ingest_rule_handler.py index 698a774db..9e3c85a19 100644 --- a/server/planning/io/ingest_rule_handler.py +++ b/server/planning/io/ingest_rule_handler.py @@ -57,10 +57,10 @@ class PlanningRoutingRuleHandler(RoutingRuleHandler): }, } - def can_handle(self, rule: Dict[str, Any], ingest_item: Dict[str, Any], routing_scheme: Dict[str, Any]): + async def can_handle(self, rule: Dict[str, Any], ingest_item: Dict[str, Any], routing_scheme: Dict[str, Any]): return ingest_item.get(ITEM_TYPE) in [CONTENT_TYPE.EVENT, CONTENT_TYPE.PLANNING] - def apply_rule(self, rule: Dict[str, Any], ingest_item: Dict[str, Any], routing_scheme: Dict[str, Any]): + async def apply_rule(self, rule: Dict[str, Any], ingest_item: Dict[str, Any], routing_scheme: Dict[str, Any]): attributes = (rule.get("actions") or {}).get("extra") or {} if not attributes: # No need to continue if none of the action attributes are set diff --git a/server/planning/io/ingest_rule_handler_test.py b/server/planning/io/ingest_rule_handler_test.py index fd7755922..a58b7f07e 100644 --- a/server/planning/io/ingest_rule_handler_test.py +++ b/server/planning/io/ingest_rule_handler_test.py @@ -73,14 +73,14 @@ class IngestRuleHandlerTestCase(TestCase): }, ] - def setUp(self): - super(IngestRuleHandlerTestCase, self).setUp() + async def asyncSetUp(self): + await super().asyncSetUp() self.handler = PlanningRoutingRuleHandler() - def test_can_handle_content(self): - self.assertTrue(self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.EVENT}, {})) - self.assertTrue(self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.PLANNING}, {})) - self.assertFalse(self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.TEXT}, {})) + async def test_can_handle_content(self): + self.assertTrue(await self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.EVENT}, {})) + self.assertTrue(await self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.PLANNING}, {})) + self.assertFalse(await self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.TEXT}, {})) async def test_adds_event_calendars(self): async with self.app.app_context(): @@ -97,7 +97,9 @@ async def test_adds_event_calendars(self): self.app.data.insert("events", [event]) original = self.app.data.find_one("events", req=None, _id=event["_id"]) - self.handler.apply_rule({"actions": {"extra": {"calendars": [self.calendars[0]["qcode"]]}}}, event, {}) + await self.handler.apply_rule( + {"actions": {"extra": {"calendars": [self.calendars[0]["qcode"]]}}}, event, {} + ) updated = self.app.data.find_one("events", req=None, _id=event["_id"]) self.assertNotEqual(original["_etag"], updated["_etag"]) @@ -121,7 +123,7 @@ async def test_skips_disabled_and_existing_calendars(self): self.app.data.insert("events", [event]) original = self.app.data.find_one("events", req=None, _id=event["_id"]) - self.handler.apply_rule( + await self.handler.apply_rule( {"actions": {"extra": {"calendars": [self.calendars[0]["qcode"], self.calendars[1]["qcode"]]}}}, event, {}, @@ -141,7 +143,7 @@ async def test_adds_planning_agendas(self): self.app.data.insert("planning", [plan]) original = self.app.data.find_one("planning", req=None, _id=plan["_id"]) - self.handler.apply_rule({"actions": {"extra": {"agendas": [self.agendas[0]["_id"]]}}}, plan, {}) + await self.handler.apply_rule({"actions": {"extra": {"agendas": [self.agendas[0]["_id"]]}}}, plan, {}) updated = self.app.data.find_one("planning", req=None, _id=plan["_id"]) self.assertNotEqual(original["_etag"], updated["_etag"]) @@ -156,7 +158,7 @@ async def test_skips_disabled_and_existing_agendas(self): self.app.data.insert("planning", [plan]) original = self.app.data.find_one("planning", req=None, _id=plan["_id"]) - self.handler.apply_rule( + await self.handler.apply_rule( {"actions": {"extra": {"agendas": [self.agendas[0]["_id"], self.agendas[1]["_id"]]}}}, plan, {} )