From 0d1a98914a4d6df8734973db9a94a75d7b10543d Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 8 Jan 2025 08:44:49 -0800 Subject: [PATCH 1/6] cleanup pending notifications --- app/celery/scheduled_tasks.py | 8 ++++++++ app/config.py | 5 +++++ app/dao/jobs_dao.py | 2 +- app/dao/notifications_dao.py | 17 +++++++++++++++++ .../notification_dao/test_notification_dao.py | 18 ++++++++++++++++++ 5 files changed, 49 insertions(+), 1 deletion(-) diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index 2dcd570cc..cb0e0886e 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -24,6 +24,7 @@ find_missing_row_for_job, ) from app.dao.notifications_dao import ( + dao_close_out_delivery_receipts, dao_update_delivery_receipts, notifications_not_yet_sent, ) @@ -278,3 +279,10 @@ def process_delivery_receipts(self): current_app.logger.error( "Failed process delivery receipts after max retries" ) + + +@notify_celery.task( + bind=True, max_retries=2, default_retry_delay=3600, name="cleanup-delivery-receipts" +) +def cleanup_delivery_receipts(self): + dao_close_out_delivery_receipts() diff --git a/app/config.py b/app/config.py index d3f2a5197..580495731 100644 --- a/app/config.py +++ b/app/config.py @@ -203,6 +203,11 @@ class Config(object): "schedule": timedelta(minutes=2), "options": {"queue": QueueNames.PERIODIC}, }, + "cleanup-delivery-receipts": { + "task": "cleanup-delivery-receipts", + "schedule": timedelta(minutes=82), + "options": {"queue": QueueNames.PERIODIC}, + }, "expire-or-delete-invitations": { "task": "expire-or-delete-invitations", "schedule": timedelta(minutes=66), diff --git a/app/dao/jobs_dao.py b/app/dao/jobs_dao.py index ddec26956..c969c4b53 100644 --- a/app/dao/jobs_dao.py +++ b/app/dao/jobs_dao.py @@ -45,7 +45,7 @@ def dao_get_job_by_service_id_and_job_id(service_id, job_id): def dao_get_unfinished_jobs(): stmt = select(Job).filter(Job.processing_finished.is_(None)) - return db.session.execute(stmt).all() + return db.session.execute(stmt).scalars().all() def dao_get_jobs_by_service_id( diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 139f7ae8a..36eeafa92 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -780,3 +780,20 @@ def dao_update_delivery_receipts(receipts, delivered): f"#loadtestperformance batch update query time: \ updated {len(receipts)} notification in {elapsed_time} ms" ) + + +def dao_close_out_delivery_receipts(): + THREE_DAYS_AGO = utc_now() - timedelta(minutes=3) + stmt = ( + update(Notification) + .where( + Notification.status == NotificationStatus.PENDING, + Notification.sent_at < THREE_DAYS_AGO, + ) + .values(status=NotificationStatus.FAILED, provider_response="Technical Failure") + ) + result = db.session.execute(stmt) + current_app.logger.info( + f"Marked {result.rowcount} notifications as technical failures" + ) + db.session.commit() diff --git a/tests/app/dao/notification_dao/test_notification_dao.py b/tests/app/dao/notification_dao/test_notification_dao.py index 6e09f182a..f6905a749 100644 --- a/tests/app/dao/notification_dao/test_notification_dao.py +++ b/tests/app/dao/notification_dao/test_notification_dao.py @@ -11,6 +11,7 @@ from app import db from app.dao.notifications_dao import ( + dao_close_out_delivery_receipts, dao_create_notification, dao_delete_notifications_by_id, dao_get_last_notification_added_for_job_id, @@ -2026,6 +2027,23 @@ def test_update_delivery_receipts(mocker): assert "provider_response" in kwargs +def test_close_out_delivery_receipts(mocker): + mock_session = mocker.patch("app.dao.notifications_dao.db.session") + mock_update = MagicMock() + mock_where = MagicMock() + mock_values = MagicMock() + mock_update.where.return_value = mock_where + mock_where.values.return_value = mock_values + + mock_session.execute.return_value = None + with patch("app.dao.notifications_dao.update", return_value=mock_update): + dao_close_out_delivery_receipts() + mock_update.where.assert_called_once() + mock_where.values.assert_called_once() + mock_session.execute.assert_called_once_with(mock_values) + mock_session.commit.assert_called_once() + + @pytest.mark.parametrize( "created_at_utc,date_to_check,expected_count", [ From a2fc97000b4c36db69ea9411fa1120a88e27c662 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 8 Jan 2025 08:58:24 -0800 Subject: [PATCH 2/6] cleanup pending notifications --- app/dao/notifications_dao.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index 36eeafa92..c8f2797a0 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -793,7 +793,9 @@ def dao_close_out_delivery_receipts(): .values(status=NotificationStatus.FAILED, provider_response="Technical Failure") ) result = db.session.execute(stmt) - current_app.logger.info( - f"Marked {result.rowcount} notifications as technical failures" - ) + db.session.commit() + if result: + current_app.logger.info( + f"Marked {result.rowcount} notifications as technical failures" + ) From da19e7c81c50f071058ab622302b633051698caf Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Wed, 8 Jan 2025 11:12:08 -0800 Subject: [PATCH 3/6] set prefetch multiplier to 2 and increase concurrency to 15 --- manifest.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manifest.yml b/manifest.yml index 39e842730..9d39c7d84 100644 --- a/manifest.yml +++ b/manifest.yml @@ -26,7 +26,7 @@ applications: - type: worker instances: ((worker_instances)) memory: ((worker_memory)) - command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=10 + command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=15 --prefetch-multiplier=2 - type: scheduler instances: 1 memory: ((scheduler_memory)) From 6aae2c7aae7ea770fe904c1bd3dc5cc5d1b385f3 Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Thu, 9 Jan 2025 10:53:33 -0800 Subject: [PATCH 4/6] fix db connection pool --- app/__init__.py | 18 +++++++++++++++++- app/celery/scheduled_tasks.py | 2 ++ app/clients/__init__.py | 3 +-- app/config.py | 2 +- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 23c2399e1..0d617ee0c 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -18,6 +18,7 @@ from werkzeug.exceptions import HTTPException as WerkzeugHTTPException from werkzeug.local import LocalProxy +from app import config from app.clients import NotificationProviderClients from app.clients.cloudwatch.aws_cloudwatch import AwsCloudwatchClient from app.clients.document_download import DocumentDownloadClient @@ -25,6 +26,7 @@ from app.clients.email.aws_ses_stub import AwsSesStubClient from app.clients.pinpoint.aws_pinpoint import AwsPinpointClient from app.clients.sms.aws_sns import AwsSnsClient +from app.utils import hilite from notifications_utils import logging, request_helper from notifications_utils.clients.encryption.encryption_client import Encryption from notifications_utils.clients.redis.redis_client import RedisClient @@ -58,15 +60,29 @@ class SQLAlchemy(_SQLAlchemy): def apply_driver_hacks(self, app, info, options): sa_url, options = super().apply_driver_hacks(app, info, options) + print(hilite(f"OPTIONS {options}")) + if "connect_args" not in options: options["connect_args"] = {} options["connect_args"]["options"] = "-c statement_timeout={}".format( int(app.config["SQLALCHEMY_STATEMENT_TIMEOUT"]) * 1000 ) + return (sa_url, options) -db = SQLAlchemy() +# Set db engine settings here for now. +# They were not being set previous (despite environmental variables with appropriate +# sounding names) and were defaulting to low values +db = SQLAlchemy( + engine_options={ + "pool_size": config.Config.SQLALCHEMY_POOL_SIZE, + "max_overflow": 10, + "pool_timeout": config.Config.SQLALCHEMY_POOL_TIMEOUT, + "pool_recycle": config.Config.SQLALCHEMY_POOL_RECYCLE, + "pool_pre_ping": True, + } +) migrate = Migrate() ma = Marshmallow() notify_celery = NotifyCelery() diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index cb0e0886e..72806aa58 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -243,6 +243,8 @@ def check_for_services_with_high_failure_rates_or_sending_to_tv_numbers(): bind=True, max_retries=7, default_retry_delay=3600, name="process-delivery-receipts" ) def process_delivery_receipts(self): + # If we need to check db settings do it here for convenience + # current_app.logger.info(f"POOL SIZE {app.db.engine.pool.size()}") """ Every eight minutes or so (see config.py) we run this task, which searches the last ten minutes of logs for delivery receipts and batch updates the db with the results. The overlap diff --git a/app/clients/__init__.py b/app/clients/__init__.py index 3392928e4..f185e45e2 100644 --- a/app/clients/__init__.py +++ b/app/clients/__init__.py @@ -13,8 +13,7 @@ "addressing_style": "virtual", }, use_fips_endpoint=True, - # This is the default but just for doc sake - max_pool_connections=10, + max_pool_connections=50, # This should be equal or greater than our celery concurrency ) diff --git a/app/config.py b/app/config.py index 580495731..9ae731290 100644 --- a/app/config.py +++ b/app/config.py @@ -81,7 +81,7 @@ class Config(object): SQLALCHEMY_DATABASE_URI = cloud_config.database_url SQLALCHEMY_RECORD_QUERIES = False SQLALCHEMY_TRACK_MODIFICATIONS = False - SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 5)) + SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 20)) SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = 300 SQLALCHEMY_STATEMENT_TIMEOUT = 1200 From 2770f76431c2c5ebde3f461002f9a9d22d0e6adb Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Thu, 9 Jan 2025 11:14:51 -0800 Subject: [PATCH 5/6] cleanup --- app/__init__.py | 2 -- app/config.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/app/__init__.py b/app/__init__.py index 0d617ee0c..add218e5d 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -26,7 +26,6 @@ from app.clients.email.aws_ses_stub import AwsSesStubClient from app.clients.pinpoint.aws_pinpoint import AwsPinpointClient from app.clients.sms.aws_sns import AwsSnsClient -from app.utils import hilite from notifications_utils import logging, request_helper from notifications_utils.clients.encryption.encryption_client import Encryption from notifications_utils.clients.redis.redis_client import RedisClient @@ -60,7 +59,6 @@ class SQLAlchemy(_SQLAlchemy): def apply_driver_hacks(self, app, info, options): sa_url, options = super().apply_driver_hacks(app, info, options) - print(hilite(f"OPTIONS {options}")) if "connect_args" not in options: options["connect_args"] = {} diff --git a/app/config.py b/app/config.py index 9ae731290..f7f08a36a 100644 --- a/app/config.py +++ b/app/config.py @@ -81,7 +81,7 @@ class Config(object): SQLALCHEMY_DATABASE_URI = cloud_config.database_url SQLALCHEMY_RECORD_QUERIES = False SQLALCHEMY_TRACK_MODIFICATIONS = False - SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 20)) + SQLALCHEMY_POOL_SIZE = int(getenv("SQLALCHEMY_POOL_SIZE", 40)) SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = 300 SQLALCHEMY_STATEMENT_TIMEOUT = 1200 From 7e7d43238fabc60f8c338f9e7d005c3070fcee0f Mon Sep 17 00:00:00 2001 From: Kenneth Kehl <@kkehl@flexion.us> Date: Thu, 9 Jan 2025 11:16:53 -0800 Subject: [PATCH 6/6] cleanup --- manifest.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/manifest.yml b/manifest.yml index 9d39c7d84..0763a1911 100644 --- a/manifest.yml +++ b/manifest.yml @@ -26,7 +26,7 @@ applications: - type: worker instances: ((worker_instances)) memory: ((worker_memory)) - command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=15 --prefetch-multiplier=2 + command: newrelic-admin run-program celery -A run_celery.notify_celery worker --loglevel=INFO --pool=threads --concurrency=10 --prefetch-multiplier=2 - type: scheduler instances: 1 memory: ((scheduler_memory))