Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/GSA/notifications-api into …
Browse files Browse the repository at this point in the history
…invite-expiration-fix
  • Loading branch information
Andrew Shumway committed Jan 13, 2025
2 parents f497203 + 67d03dd commit 345ae88
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 6 deletions.
16 changes: 15 additions & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from werkzeug.exceptions import HTTPException as WerkzeugHTTPException
from werkzeug.local import LocalProxy

from app import config
from app.clients import NotificationProviderClients
from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient
from app.clients.document_download import DocumentDownloadClient
Expand Down Expand Up @@ -58,15 +59,28 @@ class SQLAlchemy(_SQLAlchemy):

def apply_driver_hacks(self, app, info, options):
sa_url, options = super().apply_driver_hacks(app, info, options)

if "connect_args" not in options:
options["connect_args"] = {}
options["connect_args"]["options"] = "-c statement_timeout={}".format(
int(app.config["SQLALCHEMY_STATEMENT_TIMEOUT"]) * 1000
)

return (sa_url, options)


db = SQLAlchemy()
# Set db engine settings here for now.
# They were not being set previous (despite environmental variables with appropriate
# sounding names) and were defaulting to low values
db = SQLAlchemy(
engine_options={
"pool_size": config.Config.SQLALCHEMY_POOL_SIZE,
"max_overflow": 10,
"pool_timeout": config.Config.SQLALCHEMY_POOL_TIMEOUT,
"pool_recycle": config.Config.SQLALCHEMY_POOL_RECYCLE,
"pool_pre_ping": True,
}
)
migrate = Migrate()
ma = Marshmallow()
notify_celery = NotifyCelery()
Expand Down
10 changes: 10 additions & 0 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
)
Expand Down Expand Up @@ -242,6 +243,8 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers():
bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts"
)
def process_delivery_receipts(self):
# If we need to check db settings do it here for convenience
# current_app.logger.info(f"POOL SIZE {app.db.engine.pool.size()}")
"""
Every eight minutes or so (see config.py) we run this task, which searches the last ten
minutes of logs for delivery receipts and batch updates the db with the results. The overlap
Expand Down Expand Up @@ -278,3 +281,10 @@ def process_delivery_receipts(self):
current_app.logger.error(
"Failed process delivery receipts after max retries"
)


@notify_celery.task(
bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts"
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()
3 changes: 1 addition & 2 deletions app/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
"addressing_style": "virtual",
},
use_fips_endpoint=True,
# This is the default but just for doc sake
max_pool_connections=10,
max_pool_connections=50, # This should be equal or greater than our celery concurrency
)


Expand Down
7 changes: 6 additions & 1 deletion app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Config(object):
SQLALCHEMY_DATABASE_URI = cloud_config.database_url
SQLALCHEMY_RECORD_QUERIES = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 5))
SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 40))
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = 300
SQLALCHEMY_STATEMENT_TIMEOUT = 1200
Expand Down Expand Up @@ -203,6 +203,11 @@ class Config(object):
"schedule": timedelta(minutes=2),
"options": {"queue": QueueNames.PERIODIC},
},
"cleanup-delivery-receipts": {
"task": "cleanup-delivery-receipts",
"schedule": timedelta(minutes=82),
"options": {"queue": QueueNames.PERIODIC},
},
"expire-or-delete-invitations": {
"task": "expire-or-delete-invitations",
"schedule": timedelta(minutes=66),
Expand Down
2 changes: 1 addition & 1 deletion app/dao/jobs_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id):

def dao_get_unfinished_jobs():
stmt = select(Job).filter(Job.processing_finished.is_(None))
return db.session.execute(stmt).all()
return db.session.execute(stmt).scalars().all()


def dao_get_jobs_by_service_id(
Expand Down
19 changes: 19 additions & 0 deletions app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,3 +803,22 @@ def dao_update_delivery_receipts(receipts, delivered):
f"#loadtestperformance batch update query time: \
updated {len(receipts)} notification in {elapsed_time} ms"
)


def dao_close_out_delivery_receipts():
THREE_DAYS_AGO = utc_now() - timedelta(minutes=3)
stmt = (
update(Notification)
.where(
Notification.status == NotificationStatus.PENDING,
Notification.sent_at < THREE_DAYS_AGO,
)
.values(status=NotificationStatus.FAILED, provider_response="Technical Failure")
)
result = db.session.execute(stmt)

db.session.commit()
if result:
current_app.logger.info(
f"Marked {result.rowcount} notifications as technical failures"
)
2 changes: 1 addition & 1 deletion manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ applications:
- type: worker
instances: ((worker_instances))
memory: ((worker_memory))
command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=10
command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=10 --prefetch-multiplier=2
- type: scheduler
instances: 1
memory: ((scheduler_memory))
Expand Down
18 changes: 18 additions & 0 deletions tests/app/dao/notification_dao/test_notification_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from app import db
from app.dao.notifications_dao import (
dao_close_out_delivery_receipts,
dao_create_notification,
dao_delete_notifications_by_id,
dao_get_last_notification_added_for_job_id,
Expand Down Expand Up @@ -2026,6 +2027,23 @@ def test_update_delivery_receipts(mocker):
assert "provider_response" in kwargs


def test_close_out_delivery_receipts(mocker):
mock_session = mocker.patch("app.dao.notifications_dao.db.session")
mock_update = MagicMock()
mock_where = MagicMock()
mock_values = MagicMock()
mock_update.where.return_value = mock_where
mock_where.values.return_value = mock_values

mock_session.execute.return_value = None
with patch("app.dao.notifications_dao.update", return_value=mock_update):
dao_close_out_delivery_receipts()
mock_update.where.assert_called_once()
mock_where.values.assert_called_once()
mock_session.execute.assert_called_once_with(mock_values)
mock_session.commit.assert_called_once()


@pytest.mark.parametrize(
"created_at_utc,date_to_check,expected_count",
[
Expand Down

0 comments on commit 345ae88

Please sign in to comment.