Skip to content

Commit

Permalink
Add async support to ingest handlers
Browse files Browse the repository at this point in the history
SDESK-7472
  • Loading branch information
eos87 committed Jan 16, 2025
1 parent 5e5463a commit 0f8f503
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 26 deletions.
5 changes: 3 additions & 2 deletions server/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ def then_get_transmitted_item(context, path):


@when('we fetch events from "{provider_name}" ingest "{guid}"')
def step_impl_fetch_from_provider_ingest(context, provider_name, guid):
@async_run_until_complete
async def step_impl_fetch_from_provider_ingest(context, provider_name, guid):
with context.app.test_request_context(context.app.config["URL_PREFIX"]):
ingest_provider_service = get_resource_service("ingest_providers")
provider = ingest_provider_service.find_one(name=provider_name, req=None)
Expand All @@ -382,7 +383,7 @@ def step_impl_fetch_from_provider_ingest(context, provider_name, guid):
item["versioncreated"] = utcnow()
item["expiry"] = utcnow() + timedelta(minutes=20)

failed = context.ingest_items(items, provider, provider_service)
failed = await context.ingest_items(items, provider, provider_service)
assert len(failed) == 0, failed

provider = ingest_provider_service.find_one(name=provider_name, req=None)
Expand Down
12 changes: 6 additions & 6 deletions server/planning/feed_parsers/events_ml_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ async def test_update_event(self):
}

# Ingest first version
ingested, ids = ingest_item(source, provider=provider, feeding_service={})
ingested, ids = await ingest_item(source, provider=provider, feeding_service={})
self.assertTrue(ingested)
self.assertIn(source["guid"], ids)
dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0]
Expand All @@ -279,12 +279,12 @@ async def test_update_event(self):
source["versioncreated"] = source["ingest_versioncreated"]
source["name"] = "Test name"
provider["disable_item_updates"] = True
ingested, ids = ingest_item(source, provider=provider, feeding_service={})
ingested, ids = await ingest_item(source, provider=provider, feeding_service={})
self.assertFalse(ingested)

# Attempt to update with a new version
provider.pop("disable_item_updates")
ingested, ids = ingest_item(source, provider=provider, feeding_service={})
ingested, ids = await ingest_item(source, provider=provider, feeding_service={})
self.assertTrue(ingested)
self.assertIn(source["guid"], ids)
dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0]
Expand All @@ -306,7 +306,7 @@ async def test_update_published_event(self):
}

# Ingest first version
ingest_item(source, provider=provider, feeding_service={})
await ingest_item(source, provider=provider, feeding_service={})

# Publish the Event
service.patch(
Expand All @@ -326,7 +326,7 @@ async def test_update_published_event(self):
# Ingest a new version of the item, and make sure the item is re-published
source = deepcopy(original_source)
source["versioncreated"] += timedelta(hours=1)
ingest_item(source, provider=provider, feeding_service={})
await ingest_item(source, provider=provider, feeding_service={})
self.assertEqual(published_service.get(req=None, lookup={"item_id": source["guid"]}).count(), 2)
dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0]

Expand All @@ -338,7 +338,7 @@ async def test_update_published_event(self):
source = deepcopy(original_source)
source["versioncreated"] += timedelta(hours=2)
source["pubstatus"] = POST_STATE.CANCELLED
ingest_item(source, provider=provider, feeding_service={})
await ingest_item(source, provider=provider, feeding_service={})
self.assertEqual(published_service.get(req=None, lookup={"item_id": source["guid"]}).count(), 3)
dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0]

Expand Down
12 changes: 6 additions & 6 deletions server/planning/feed_parsers/superdesk_planning_xml_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async def test_update_planning(self):
}

# Ingest first version
ingested, ids = ingest_item(source, provider=provider, feeding_service={})
ingested, ids = await ingest_item(source, provider=provider, feeding_service={})
self.assertTrue(ingested)
self.assertIn(source["guid"], ids)
dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0]
Expand All @@ -166,12 +166,12 @@ async def test_update_planning(self):
source["versioncreated"] = source["ingest_versioncreated"]
source["slugline"] = "Test slugline"
provider["disable_item_updates"] = True
ingested, ids = ingest_item(source, provider=provider, feeding_service={})
ingested, ids = await ingest_item(source, provider=provider, feeding_service={})
self.assertFalse(ingested)

# Attempt to update with a new version
provider.pop("disable_item_updates")
ingested, ids = ingest_item(source, provider=provider, feeding_service={})
ingested, ids = await ingest_item(source, provider=provider, feeding_service={})
self.assertTrue(ingested)
self.assertIn(source["guid"], ids)
dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0]
Expand All @@ -194,7 +194,7 @@ async def test_update_published_planning(self):
}

# Ingest first version
ingested, ids = ingest_item(source, provider=provider, feeding_service={})
ingested, ids = await ingest_item(source, provider=provider, feeding_service={})
self.assertTrue(ingested)
self.assertIn(source["guid"], ids)

Expand All @@ -216,7 +216,7 @@ async def test_update_published_planning(self):
# Ingest a new version of the item, and make sure the item is re-published
source = deepcopy(original_source)
source["versioncreated"] += timedelta(hours=1)
ingest_item(source, provider=provider, feeding_service={})
await ingest_item(source, provider=provider, feeding_service={})
self.assertEqual(published_service.get(req=None, lookup={"item_id": source["guid"]}).count(), 2)
dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0]

Expand All @@ -228,7 +228,7 @@ async def test_update_published_planning(self):
source = deepcopy(original_source)
source["versioncreated"] += timedelta(hours=2)
source["pubstatus"] = POST_STATE.CANCELLED
ingest_item(source, provider=provider, feeding_service={})
await ingest_item(source, provider=provider, feeding_service={})
self.assertEqual(published_service.get(req=None, lookup={"item_id": source["guid"]}).count(), 3)
dest = list(service.get_from_mongo(req=None, lookup={"guid": source["guid"]}))[0]

Expand Down
4 changes: 2 additions & 2 deletions server/planning/io/ingest_rule_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ class PlanningRoutingRuleHandler(RoutingRuleHandler):
},
}

def can_handle(self, rule: Dict[str, Any], ingest_item: Dict[str, Any], routing_scheme: Dict[str, Any]):
async def can_handle(self, rule: Dict[str, Any], ingest_item: Dict[str, Any], routing_scheme: Dict[str, Any]):
return ingest_item.get(ITEM_TYPE) in [CONTENT_TYPE.EVENT, CONTENT_TYPE.PLANNING]

def apply_rule(self, rule: Dict[str, Any], ingest_item: Dict[str, Any], routing_scheme: Dict[str, Any]):
async def apply_rule(self, rule: Dict[str, Any], ingest_item: Dict[str, Any], routing_scheme: Dict[str, Any]):
attributes = (rule.get("actions") or {}).get("extra") or {}
if not attributes:
# No need to continue if none of the action attributes are set
Expand Down
22 changes: 12 additions & 10 deletions server/planning/io/ingest_rule_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ class IngestRuleHandlerTestCase(TestCase):
},
]

def setUp(self):
super(IngestRuleHandlerTestCase, self).setUp()
async def asyncSetUp(self):
await super().asyncSetUp()
self.handler = PlanningRoutingRuleHandler()

def test_can_handle_content(self):
self.assertTrue(self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.EVENT}, {}))
self.assertTrue(self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.PLANNING}, {}))
self.assertFalse(self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.TEXT}, {}))
async def test_can_handle_content(self):
self.assertTrue(await self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.EVENT}, {}))
self.assertTrue(await self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.PLANNING}, {}))
self.assertFalse(await self.handler.can_handle({}, {ITEM_TYPE: CONTENT_TYPE.TEXT}, {}))

async def test_adds_event_calendars(self):
async with self.app.app_context():
Expand All @@ -97,7 +97,9 @@ async def test_adds_event_calendars(self):
self.app.data.insert("events", [event])
original = self.app.data.find_one("events", req=None, _id=event["_id"])

self.handler.apply_rule({"actions": {"extra": {"calendars": [self.calendars[0]["qcode"]]}}}, event, {})
await self.handler.apply_rule(
{"actions": {"extra": {"calendars": [self.calendars[0]["qcode"]]}}}, event, {}
)

updated = self.app.data.find_one("events", req=None, _id=event["_id"])
self.assertNotEqual(original["_etag"], updated["_etag"])
Expand All @@ -121,7 +123,7 @@ async def test_skips_disabled_and_existing_calendars(self):
self.app.data.insert("events", [event])
original = self.app.data.find_one("events", req=None, _id=event["_id"])

self.handler.apply_rule(
await self.handler.apply_rule(
{"actions": {"extra": {"calendars": [self.calendars[0]["qcode"], self.calendars[1]["qcode"]]}}},
event,
{},
Expand All @@ -141,7 +143,7 @@ async def test_adds_planning_agendas(self):
self.app.data.insert("planning", [plan])
original = self.app.data.find_one("planning", req=None, _id=plan["_id"])

self.handler.apply_rule({"actions": {"extra": {"agendas": [self.agendas[0]["_id"]]}}}, plan, {})
await self.handler.apply_rule({"actions": {"extra": {"agendas": [self.agendas[0]["_id"]]}}}, plan, {})

updated = self.app.data.find_one("planning", req=None, _id=plan["_id"])
self.assertNotEqual(original["_etag"], updated["_etag"])
Expand All @@ -156,7 +158,7 @@ async def test_skips_disabled_and_existing_agendas(self):
self.app.data.insert("planning", [plan])
original = self.app.data.find_one("planning", req=None, _id=plan["_id"])

self.handler.apply_rule(
await self.handler.apply_rule(
{"actions": {"extra": {"agendas": [self.agendas[0]["_id"], self.agendas[1]["_id"]]}}}, plan, {}
)

Expand Down

0 comments on commit 0f8f503

Please sign in to comment.