Skip to content

Commit

Permalink
Merge pull request #1343 from GSA/notify-api-1299
Browse files Browse the repository at this point in the history
optimize S3 partitioning
  • Loading branch information
terrazoon authored Oct 2, 2024
2 parents aba9132 + c792a24 commit ae5c0d2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 15 deletions.
45 changes: 36 additions & 9 deletions app/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from notifications_utils import aware_utcnow

FILE_LOCATION_STRUCTURE = "service-{}-notify/{}.csv"
NEW_FILE_LOCATION_STRUCTURE = "{}-service-notify/{}.csv"

# Temporarily extend cache to 7 days
ttl = 60 * 60 * 24 * 7
Expand Down Expand Up @@ -263,6 +264,21 @@ def file_exists(file_location):


def get_job_location(service_id, job_id):
return (
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id),
current_app.config["CSV_UPLOAD_BUCKET"]["access_key_id"],
current_app.config["CSV_UPLOAD_BUCKET"]["secret_access_key"],
current_app.config["CSV_UPLOAD_BUCKET"]["region"],
)


def get_old_job_location(service_id, job_id):
"""
This is deprecated. We are transitioning to NEW_FILE_LOCATION_STRUCTURE,
but it will take a few days where we have to support both formats.
Remove this when everything works with the NEW_FILE_LOCATION_STRUCTURE.
"""
return (
current_app.config["CSV_UPLOAD_BUCKET"]["bucket"],
FILE_LOCATION_STRUCTURE.format(service_id, job_id),
Expand Down Expand Up @@ -291,25 +307,36 @@ def get_job_from_s3(service_id, job_id):
max_retries = 4
backoff_factor = 0.2

if not file_exists(FILE_LOCATION_STRUCTURE.format(service_id, job_id)):
if not file_exists(
FILE_LOCATION_STRUCTURE.format(service_id, job_id)
) and not file_exists(NEW_FILE_LOCATION_STRUCTURE.format(service_id, job_id)):
current_app.logger.error(
f"This file does not exist {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}"
f"This file with service_id {service_id} and job_id {job_id} does not exist"
)
return None

while retries < max_retries:

try:
obj = get_s3_object(*get_job_location(service_id, job_id))
return obj.get()["Body"].read().decode("utf-8")
# TODO
# for transition on optimizing the s3 partition, we have
# to check for the file location using the new way and the
# old way. After this has been on production for a few weeks
# we should remove the check for the old way.
try:
obj = get_s3_object(*get_job_location(service_id, job_id))
return obj.get()["Body"].read().decode("utf-8")
except botocore.exceptions.ClientError:
obj = get_s3_object(*get_old_job_location(service_id, job_id))
return obj.get()["Body"].read().decode("utf-8")
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] in [
"Throttling",
"RequestTimeout",
"SlowDown",
]:
current_app.logger.exception(
f"Retrying job fetch {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
f"Retrying job fetch service_id {service_id} job_id {job_id} retry_count={retries}",
)
retries += 1
sleep_time = backoff_factor * (2**retries) # Exponential backoff
Expand All @@ -318,18 +345,18 @@ def get_job_from_s3(service_id, job_id):
else:
# Typically this is "NoSuchKey"
current_app.logger.exception(
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
f"Failed to get job with service_id {service_id} job_id {job_id}",
)
return None

except Exception:
current_app.logger.exception(
f"Failed to get job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} retry_count={retries}",
f"Failed to get job with service_id {service_id} job_id {job_id}retry_count={retries}",
)
return None

current_app.logger.error(
f"Never retrieved job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)}",
f"Never retrieved job with service_id {service_id} job_id {job_id}",
)
return None

Expand Down Expand Up @@ -395,7 +422,7 @@ def get_phone_number_from_s3(service_id, job_id, job_row_number):

if job is None:
current_app.logger.error(
f"Couldnt find phone for job {FILE_LOCATION_STRUCTURE.format(service_id, job_id)} because job is missing"
f"Couldnt find phone for job with service_id {service_id} job_id {job_id} because job is missing"
)
return "Unavailable"

Expand Down
10 changes: 5 additions & 5 deletions app/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ def purge_csv_bucket():
# generate n number of test orgs into the dev DB
@notify_command(name="add-test-organizations-to-db")
@click.option("-g", "--generate", required=True, prompt=True, default=1)
def add_test_organizations_to_db(generate):
def add_test_organizations_to_db(generate): # pragma: no cover
if getenv("NOTIFY_ENVIRONMENT", "") not in ["development", "test"]:
current_app.logger.error("Can only be run in development")
return
Expand Down Expand Up @@ -993,7 +993,7 @@ def generate_gov_agency():
# generate n number of test services into the dev DB
@notify_command(name="add-test-services-to-db")
@click.option("-g", "--generate", required=True, prompt=True, default=1)
def add_test_services_to_db(generate):
def add_test_services_to_db(generate): # pragma: no cover
if getenv("NOTIFY_ENVIRONMENT", "") not in ["development", "test"]:
current_app.logger.error("Can only be run in development")
return
Expand All @@ -1007,7 +1007,7 @@ def add_test_services_to_db(generate):
# generate n number of test jobs into the dev DB
@notify_command(name="add-test-jobs-to-db")
@click.option("-g", "--generate", required=True, prompt=True, default=1)
def add_test_jobs_to_db(generate):
def add_test_jobs_to_db(generate): # pragma: no cover
if getenv("NOTIFY_ENVIRONMENT", "") not in ["development", "test"]:
current_app.logger.error("Can only be run in development")
return
Expand All @@ -1022,7 +1022,7 @@ def add_test_jobs_to_db(generate):
# generate n number of notifications into the dev DB
@notify_command(name="add-test-notifications-to-db")
@click.option("-g", "--generate", required=True, prompt=True, default=1)
def add_test_notifications_to_db(generate):
def add_test_notifications_to_db(generate): # pragma: no cover
if getenv("NOTIFY_ENVIRONMENT", "") not in ["development", "test"]:
current_app.logger.error("Can only be run in development")
return
Expand All @@ -1043,7 +1043,7 @@ def add_test_notifications_to_db(generate):
@click.option("-g", "--generate", required=True, prompt=True, default="1")
@click.option("-s", "--state", default="active")
@click.option("-d", "--admin", default=False, type=bool)
def add_test_users_to_db(generate, state, admin):
def add_test_users_to_db(generate, state, admin): # pragma: no cover
if getenv("NOTIFY_ENVIRONMENT", "") not in ["development", "test"]:
current_app.logger.error("Can only be run in development")
return
Expand Down
2 changes: 1 addition & 1 deletion tests/app/aws/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ def test_get_job_from_s3_exponential_backoff_on_throttling(mocker):
mocker.patch("app.aws.s3.file_exists", return_value=True)
job = get_job_from_s3("service_id", "job_id")
assert job is None
assert mock_get_object.call_count == 4
assert mock_get_object.call_count == 8


def test_get_job_from_s3_exponential_backoff_file_not_found(mocker):
Expand Down

0 comments on commit ae5c0d2

Please sign in to comment.