Skip to content

Commit

Permalink
feat(curated-recommendations): [MC-1270] handle graphql errors and re…
Browse files Browse the repository at this point in the history
…factor retry logic (#621)

* [MC-1270] feat: Handle curated-corpus-api GraphQL errors

* Fix linting errors in tests

* Update merino/curated_recommendations/corpus_backends/corpus_api_backend.py

Co-authored-by: Mathijs Miermans <[email protected]>

* Update the fetch() function to throw an exception if GraphQL error occurs

* Replace integration test for GraphQL error with the one Mathijs outlined

* Update new test name

* Fix test

* replace the integration test rather than create a new, very similar one

* Fix linting errors

* feat(curated-recommendations) [MC-1270]: handle GraphQL errors and refactor retries

- tenacity is used to retry the corpus api response, replacing custom retry code.
- `CorpusGraphQLError` is raised when a GraphQL error is received.

---------

Co-authored-by: Nina Pypchenko <[email protected]>
  • Loading branch information
mmiermans and nina-py authored Sep 9, 2024
1 parent 6e67609 commit 60de15f
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 83 deletions.
2 changes: 2 additions & 0 deletions docs/data.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ The following additional metrics are recorded when curated recommendations are r
A timer to measure the duration (in ms) of looking up a list of scheduled corpus items.
- `corpus_api.request.status_codes.{res.status_code}` -
A counter to measure the status codes of an HTTP request to the curated-corpus-api.
- `corpus_api.request.graphql_error` -
A counter to measure the number of GraphQL errors from the curated-corpus-api.
- `recommendation.engagement.update.timing` -
A timer to measure the duration (in ms) of updating the engagement data from GCS.
- `recommendation.engagement.size` - A gauge to track the size of the blob on GCS.
Expand Down
15 changes: 15 additions & 0 deletions merino/configs/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,21 @@ gcp_project = ""
cron_interval_seconds = 30


[default.curated_recommendations.corpus_api]
# MERINO__CURATED_RECOMMENDATIONS__CORPUS_API__RETRY_COUNT
# The maximum number of times to retry corpus api requests on failure before giving up.
retry_count = 3

# MERINO__CURATED_RECOMMENDATIONS__CORPUS_API__RETRY_WAIT_INITIAL_SECONDS
# Initial time in seconds to wait before retrying corpus api requests.
# Gets doubled on each failure, until the maximum number of retries is met.
retry_wait_initial_seconds = 0.5

# MERINO__CURATED_RECOMMENDATIONS__CORPUS_API__RETRY_WAIT_JITTER_SECONDS
# Uniformly random time in seconds to add to the wait time before retrying corpus api requests.
retry_wait_jitter_seconds = 0.2


[default.jobs.wikipedia_indexer]
# MERINO_JOBS__WIKIPEDIA_INDEXER__ES_URL
# The URL of the Elasticsearch cluster for indexing job.
Expand Down
7 changes: 7 additions & 0 deletions merino/configs/testing.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,10 @@ backend = "test"
[testing.curated_recommendations.gcs_engagement]
bucket_name = "test-bucket"
gcp_project = "test-project"


[testing.curated_recommendations.corpus_api]

# Tests sleep for a shorter duration than defautl to make them execute more quickly.
retry_wait_initial_seconds = 0.05
retry_wait_jitter_seconds = 0.02
Original file line number Diff line number Diff line change
@@ -1,26 +1,39 @@
"""Corpus API backend for making GRAPHQL requests"""

from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
import asyncio
import logging
import random
from datetime import datetime, timedelta
import asyncio
from urllib.parse import urlparse, urlencode, parse_qsl
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError

import aiodogstatsd
from httpx import AsyncClient, HTTPError
from urllib.parse import urlparse, urlencode, parse_qsl
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_exponential_jitter,
before_sleep_log,
)

from merino.config import settings
from merino.curated_recommendations.corpus_backends.protocol import (
CorpusBackend,
CorpusItem,
Topic,
ScheduledSurfaceId,
)
from merino.exceptions import BackendError
from merino.utils.version import fetch_app_version_from_file

logger = logging.getLogger(__name__)


class CorpusGraphQLError(BackendError):
"""Error during interaction with the corpus GraphQL API."""


class CorpusApiGraphConfig:
"""Corpus API Graph Config."""

Expand Down Expand Up @@ -97,9 +110,6 @@ class CorpusApiBackend(CorpusBackend):
# rate to the scheduledSurface query stays close to the historic rate of ~100 requests/minute.
cache_time_to_live_min = timedelta(seconds=50)
cache_time_to_live_max = timedelta(seconds=70)
# The backoff time is the time that is waited before retrying.
# fetch() makes a single retry attempt, so there's exponential backoff (for now).
_backoff_time = timedelta(seconds=0.5)
_cache: dict[ScheduledSurfaceId, list[CorpusItem]]
_expirations: dict[ScheduledSurfaceId, datetime]
_locks: dict[ScheduledSurfaceId, asyncio.Lock]
Expand All @@ -126,7 +136,8 @@ def map_corpus_topic_to_serp_topic(topic: str) -> Topic | None:

@staticmethod
def get_utm_source(scheduled_surface_id: ScheduledSurfaceId) -> str | None:
"""Return utm_source value to attribute curated recommendations to, based on the scheduled_surface_id.
"""Return utm_source value to attribute curated recommendations to, based on the
scheduled_surface_id.
https://github.com/Pocket/recommendation-api/blob/main/app/data_providers/slate_providers/slate_provider.py#L95C5-L100C46
"""
return SCHEDULED_SURFACE_ID_TO_UTM_SOURCE.get(scheduled_surface_id)
Expand Down Expand Up @@ -166,10 +177,12 @@ def get_surface_timezone(scheduled_surface_id: ScheduledSurfaceId) -> ZoneInfo:
try:
return ZoneInfo(zones[scheduled_surface_id])
except (KeyError, ZoneInfoNotFoundError) as e:
# Graceful degradation: continue to serve recommendations if timezone cannot be obtained for the surface.
# Graceful degradation: continue to serve recommendations if timezone cannot be obtained
# for the surface.
default_tz = ZoneInfo("UTC")
logging.error(
f"Failed to get timezone for {scheduled_surface_id}, so defaulting to {default_tz}: {e}"
f"Failed to get timezone for {scheduled_surface_id}, "
f"so defaulting to {default_tz}: {e}"
)
return default_tz

Expand All @@ -186,18 +199,28 @@ async def fetch(self, surface_id: ScheduledSurfaceId) -> list[CorpusItem]:
# If we have expired cached data, revalidate asynchronously without waiting for the result.
if cache_key in self._cache:
if now >= self._expirations[cache_key]:
task = asyncio.create_task(self._revalidate_cache(surface_id)) # noqa: should not 'await'
# Save a reference to the result of this function, to avoid a task disappearing
# mid-execution. The event loop only keeps weak references to tasks. A task that
# isn’t referenced elsewhere may get garbage collected, even before it’s done.
# https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
task = asyncio.create_task(self._revalidate_cache(surface_id))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
return self._cache[cache_key]

# If no cache value exists, fetch new data and await the result.
return await self._revalidate_cache(surface_id)

@retry(
wait=wait_exponential_jitter(
initial=settings.curated_recommendations.corpus_api.retry_wait_initial_seconds,
jitter=settings.curated_recommendations.corpus_api.retry_wait_jitter_seconds,
),
stop=stop_after_attempt(settings.curated_recommendations.corpus_api.retry_count),
retry=retry_if_exception_type((CorpusGraphQLError, HTTPError, ValueError)),
reraise=True, # Raise the exception our code encountered, instead of a RetryError
before_sleep=before_sleep_log(logger, logging.WARNING),
)
async def _fetch_from_backend(self, surface_id: ScheduledSurfaceId) -> list[CorpusItem]:
"""Issue a scheduledSurface query"""
query = """
Expand Down Expand Up @@ -243,6 +266,12 @@ async def _fetch_from_backend(self, surface_id: ScheduledSurfaceId) -> list[Corp
res.raise_for_status()
data = res.json()

if res.status_code == 200 and "errors" in data:
self.metrics_client.increment("corpus_api.request.graphql_error")
raise CorpusGraphQLError(
f"curated-corpus-api returned GraphQL errors {data['errors']}"
)

# get the utm_source based on scheduled surface id
utm_source = self.get_utm_source(surface_id)

Expand All @@ -260,12 +289,13 @@ async def _fetch_from_backend(self, surface_id: ScheduledSurfaceId) -> list[Corp
CorpusItem(**item["corpusItem"], scheduledCorpusItemId=item["id"])
for item in data["data"]["scheduledSurface"]["items"]
]

return curated_recommendations

async def _revalidate_cache(self, surface_id: ScheduledSurfaceId) -> list[CorpusItem]:
"""Purge and update the cache for a specific surface. If the API responds with an error code,
re-try the request again. If there is still an error response, don't bust cache, return the latest
valid data from the cache.
"""Update the cache for a specific surface and return the corpus items.
If the API fails to respond successfully even after retries, return the latest cached data.
Only a single "coalesced" request will be made to the backend per surface id.
"""
cache_key = surface_id

Expand All @@ -279,29 +309,18 @@ async def _revalidate_cache(self, surface_id: ScheduledSurfaceId) -> list[Corpus

# Attempt to fetch new data from the backend
try:
data = await self.retry_fetch(surface_id, cache_key)
data = await self._fetch_from_backend(surface_id)
self._cache[cache_key] = data
self._expirations[cache_key] = self.get_expiration_time()
return data
except (HTTPError, ValueError) as e:
logger.warning(
f"Exception occurred on first attempt to fetch: "
f"Retrying CorpusApiBackend._fetch_from_backend once after {e}"
)

# Retry fetching data
try:
data = await self.retry_fetch(surface_id, cache_key)
return data
except (HTTPError, ValueError) as e:
logger.warning(
f"Retrying CorpusApiBackend._fetch_from_backend failed: {e}. "
f"Returning latest valid cached data."
)
# Provide the latest valid cached data if available
return self._cache.get(cache_key, [])
except Exception as e:
# Backoff prevents high API rate when an unforeseen error occurs.
await asyncio.sleep(self._backoff_time.total_seconds())
raise e
if cache_key in self._cache:
logger.error(
f"Failed to update corpus cache: {e}. Returning stale cached data."
)
return self._cache[cache_key]
else:
raise e

@staticmethod
def get_expiration_time() -> datetime:
Expand All @@ -312,15 +331,3 @@ def get_expiration_time() -> datetime:
CorpusApiBackend.cache_time_to_live_max.total_seconds(),
)
return datetime.now() + timedelta(seconds=time_to_live_seconds)

async def retry_fetch(
self, surface_id: ScheduledSurfaceId, cache_key: ScheduledSurfaceId
) -> list[CorpusItem]:
"""Retry fetching data & update cache with valid data."""
data = await self._fetch_from_backend(surface_id)
if not data: # Check if the fetched data is valid
raise ValueError("retry_fetch: Response is invalid or empty")
# Update the cache with valid data
self._cache[cache_key] = data
self._expirations[cache_key] = self.get_expiration_time()
return data
19 changes: 17 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ types-python-dateutil = "^2.8.19.13"
pydantic = "^2.1.0"
scipy = "^1.14.1"
orjson = "^3.10.7"
tenacity = "^9.0.0"

[tool.poetry.group.dev.dependencies]
mypy = "^1.5"
Expand Down
20 changes: 20 additions & 0 deletions tests/data/graphql_error.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"data": null,
"errors": [
{
"message": "Could not find Scheduled Surface with id of \"NEW_TAB_EN_UX\".",
"locations": [
{
"line": 1,
"column": 79
}
],
"path": [
"scheduledSurface"
],
"extensions": {
"code": "BAD_USER_INPUT"
}
}
]
}
Loading

0 comments on commit 60de15f

Please sign in to comment.