Skip to content

Commit

Permalink
Merge pull request #1341 from GSA/shared_memory
Browse files Browse the repository at this point in the history
use shared memory instead of expiring dict for jobs cache
  • Loading branch information
terrazoon authored Sep 27, 2024
2 parents b265186 + 8e6c079 commit d3768e3
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .ds.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -384,5 +384,5 @@
}
]
},
"generated_at": "2024-09-11T14:31:46Z"
"generated_at": "2024-09-26T20:29:19Z"
}
2 changes: 1 addition & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
- name: Check for dead code
run: make dead-code
- name: Run tests with coverage
run: poetry run coverage run --omit=*/notifications_utils/*,*/migrations/* -m pytest --maxfail=10
run: poetry run coverage run --omit=*/migrations/* -m pytest --maxfail=10
env:
SQLALCHEMY_DATABASE_TEST_URI: postgresql://user:password@localhost:5432/test_notification_api
NOTIFY_E2E_TEST_EMAIL: ${{ secrets.NOTIFY_E2E_TEST_EMAIL }}
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ test: ## Run tests and create coverage report
poetry run black .
poetry run flake8 .
poetry run isort --check-only ./app ./tests
poetry run coverage run --omit=*/notifications_utils/*,*/migrations/* -m pytest --maxfail=10
poetry run coverage run --omit=*/migrations/* -m pytest --maxfail=10

poetry run coverage report -m --fail-under=95
poetry run coverage html -d .coverage_cache
Expand Down
98 changes: 44 additions & 54 deletions app/aws/s3.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,43 @@
import datetime
import re
import time
from multiprocessing import Manager

import botocore
from boto3 import Session
from expiringdict import ExpiringDict
from flask import current_app

from app import redis_store
from app.clients import AWS_CLIENT_CONFIG
from notifications_utils import aware_utcnow

FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv"

# Temporarily extend cache to 7 days
ttl = 60 * 60 * 24 * 7
JOBS = ExpiringDict(max_len=20000, max_age_seconds=ttl)
manager = Manager()
job_cache = manager.dict()


JOBS_CACHE_HITS = "JOBS_CACHE_HITS"
JOBS_CACHE_MISSES = "JOBS_CACHE_MISSES"

# Global variable
s3_client = None
s3_resource = None


def set_job_cache(job_cache, key, value):
job_cache[key] = (value, time.time() + 8 * 24 * 60 * 60)


def clean_cache():
current_time = time.time()
keys_to_delete = []
for key, (_, expiry_time) in job_cache.items():
if expiry_time < current_time:
keys_to_delete.append(key)

for key in keys_to_delete:
del job_cache[key]


def get_s3_client():
global s3_client
if s3_client is None:
Expand Down Expand Up @@ -127,29 +139,29 @@ def get_s3_files():

s3res = get_s3_resource()
current_app.logger.info(
f"JOBS cache length before regen: {len(JOBS)} #notify-admin-1200"
f"job_cache length before regen: {len(job_cache)} #notify-admin-1200"
)
for object in objects:
# We put our csv files in the format "service-{service_id}-notify/{job_id}"
try:
object_arr = object.split("/")
job_id = object_arr[1] # get the job_id
job_id = job_id.replace(".csv", "") # we just want the job_id
if JOBS.get(job_id) is None:
if job_cache.get(job_id) is None:
object = (
s3res.Object(bucket_name, object)
.get()["Body"]
.read()
.decode("utf-8")
)
if "phone number" in object.lower():
JOBS[job_id] = object
set_job_cache(job_cache, job_id, object)
except LookupError:
# perhaps our key is not formatted as we expected. If so skip it.
current_app.logger.exception("LookupError #notify-admin-1200")

current_app.logger.info(
f"JOBS cache length after regen: {len(JOBS)} #notify-admin-1200"
f"job_cache length after regen: {len(job_cache)} #notify-admin-1200"
)


Expand Down Expand Up @@ -287,20 +299,6 @@ def get_job_from_s3(service_id, job_id):
return None


def incr_jobs_cache_misses():
if not redis_store.get(JOBS_CACHE_MISSES):
redis_store.set(JOBS_CACHE_MISSES, 1)
else:
redis_store.incr(JOBS_CACHE_MISSES)


def incr_jobs_cache_hits():
if not redis_store.get(JOBS_CACHE_HITS):
redis_store.set(JOBS_CACHE_HITS, 1)
else:
redis_store.incr(JOBS_CACHE_HITS)


def extract_phones(job):
job = job.split("\r\n")
first_row = job[0]
Expand Down Expand Up @@ -333,7 +331,8 @@ def extract_phones(job):


def extract_personalisation(job):
job = job.split("\r\n")

job = job[0].split("\r\n")
first_row = job[0]
job.pop(0)
first_row = first_row.split(",")
Expand All @@ -348,18 +347,15 @@ def extract_personalisation(job):


def get_phone_number_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need a phone number.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id)
job = job_cache.get(job_id)
if job is None:
current_app.logger.info(f"job {job_id} was not in the cache")
job = get_job_from_s3(service_id, job_id)
# Even if it is None, put it here to avoid KeyErrors
JOBS[job_id] = job
incr_jobs_cache_misses()
set_job_cache(job_cache, job_id, job)
else:
incr_jobs_cache_hits()
# skip expiration date from cache, we don't need it here
job = job[0]

if job is None:
current_app.logger.error(
Expand All @@ -369,22 +365,17 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):

# If we look in the JOBS cache for the quick lookup dictionary of phones for a given job
# and that dictionary is not there, create it
if JOBS.get(f"{job_id}_phones") is None:
JOBS[f"{job_id}_phones"] = extract_phones(job)
if job_cache.get(f"{job_id}_phones") is None:
phones = extract_phones(job)
set_job_cache(job_cache, f"{job_id}_phones", phones)

# If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_phones") is not None:
phone_to_return = JOBS.get(f"{job_id}_phones").get(job_row_number)
if phone_to_return:
return phone_to_return
else:
current_app.logger.warning(
f"Was unable to retrieve phone number from lookup dictionary for job {job_id}"
)
return "Unavailable"
phone_to_return = phones[job_row_number]
if phone_to_return:
return phone_to_return
else:
current_app.logger.error(
f"Was unable to construct lookup dictionary for job {job_id}"
current_app.logger.warning(
f"Was unable to retrieve phone number from lookup dictionary for job {job_id}"
)
return "Unavailable"

Expand All @@ -393,13 +384,10 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):
# We don't want to constantly pull down a job from s3 every time we need the personalisation.
# At the same time we don't want to store it in redis or the db
# So this is a little recycling mechanism to reduce the number of downloads.
job = JOBS.get(job_id)
job = job_cache.get(job_id)
if job is None:
job = get_job_from_s3(service_id, job_id)
JOBS[job_id] = job
incr_jobs_cache_misses()
else:
incr_jobs_cache_hits()
set_job_cache(job_cache, job_id, job)

# If the job is None after our attempt to retrieve it from s3, it
# probably means the job is old and has been deleted from s3, in
Expand All @@ -414,12 +402,14 @@ def get_personalisation_from_s3(service_id, job_id, job_row_number):

# If we look in the JOBS cache for the quick lookup dictionary of personalisations for a given job
# and that dictionary is not there, create it
if JOBS.get(f"{job_id}_personalisation") is None:
JOBS[f"{job_id}_personalisation"] = extract_personalisation(job)
if job_cache.get(f"{job_id}_personalisation") is None:
set_job_cache(
job_cache, f"{job_id}_personalisation", extract_personalisation(job)
)

# If we can find the quick dictionary, use it
if JOBS.get(f"{job_id}_personalisation") is not None:
personalisation_to_return = JOBS.get(f"{job_id}_personalisation").get(
if job_cache.get(f"{job_id}_personalisation") is not None:
personalisation_to_return = job_cache.get(f"{job_id}_personalisation")[0].get(
job_row_number
)
if personalisation_to_return:
Expand Down
5 changes: 5 additions & 0 deletions app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ def regenerate_job_cache():
s3.get_s3_files()


@notify_celery.task(name="clean-job-cache")
def clean_job_cache():
s3.clean_cache()


@notify_celery.task(name="delete-old-s3-objects")
def delete_old_s3_objects():
s3.cleanup_old_s3_objects()
Expand Down
5 changes: 5 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ class Config(object):
"expires": 60,
}, # Ensure it doesn't run if missed
},
"clean-job-cache": {
"task": "clean-job-cache",
"schedule": crontab(hour=2, minute=11),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-unfinished-jobs": {
"task": "cleanup-unfinished-jobs",
"schedule": crontab(hour=4, minute=5),
Expand Down
3 changes: 1 addition & 2 deletions tests/app/aws/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def test_cleanup_old_s3_objects(mocker):
whereas a 30 day old job ("A") is.
"""
mocker.patch("app.aws.s3.get_bucket_name", return_value="Bucket")

mock_s3_client = mocker.Mock()
mocker.patch("app.aws.s3.get_s3_client", return_value=mock_s3_client)
mock_remove_csv_object = mocker.patch("app.aws.s3.remove_csv_object")
Expand Down Expand Up @@ -110,7 +111,6 @@ def test_get_s3_file_makes_correct_call(notify_api, mocker):
def test_get_phone_number_from_s3(
mocker, job, job_id, job_row_number, expected_phone_number
):
mocker.patch("app.aws.s3.redis_store")
get_job_mock = mocker.patch("app.aws.s3.get_job_from_s3")
get_job_mock.return_value = job
phone_number = get_phone_number_from_s3("service_id", job_id, job_row_number)
Expand Down Expand Up @@ -175,7 +175,6 @@ def test_get_job_from_s3_exponential_backoff_file_not_found(mocker):
def test_get_personalisation_from_s3(
mocker, job, job_id, job_row_number, expected_personalisation
):
mocker.patch("app.aws.s3.redis_store")
get_job_mock = mocker.patch("app.aws.s3.get_job_from_s3")
get_job_mock.return_value = job
personalisation = get_personalisation_from_s3("service_id", job_id, job_row_number)
Expand Down

0 comments on commit d3768e3

Please sign in to comment.