diff --git a/app/celery/scheduled_tasks.py b/app/celery/scheduled_tasks.py index e173c923a..9fcfeeb04 100644 --- a/app/celery/scheduled_tasks.py +++ b/app/celery/scheduled_tasks.py @@ -36,7 +36,7 @@ ) from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago from app.enums import JobStatus, NotificationType -from app.models import Job +from app.models import Job, Notification from app.notifications.process_notifications import send_notification_to_queue from app.utils import utc_now from notifications_utils import aware_utcnow @@ -292,13 +292,39 @@ def cleanup_delivery_receipts(self): @notify_celery.task(bind=True, name="batch-insert-notifications") def batch_insert_notifications(self): + current_app.logger.info("ENTER SCHEDULED TASK") batch = [] - with redis_store.pipeline: - notification = redis_store.lpop("notification_queue") - batch.append(json.loads(notification)) + # with redis_store.pipeline(): + # while redis_store.llen("message_queue") > 0: + # redis_store.lpop("message_queue") + # current_app.logger.info("EMPTY!") + # return + with redis_store.pipeline(): + current_app.logger.info("PIPELINE") + # since this list is always growing, just grab what is available when + # this call is made and process that. + current_len = redis_store.llen("message_queue") + count = 0 + while count < current_len: + count = count + 1 + notification_bytes = redis_store.lpop("message_queue") + notification_dict = json.loads(notification_bytes.decode("utf-8")) + notification_dict["status"] = notification_dict.pop("notification_status") + notification_dict["created_at"] = utc_now() + notification = Notification(**notification_dict) + current_app.logger.info( + f"WHAT IS THIS NOTIFICATION {type(notification)} {notification}" + ) + if notification is not None: + current_app.logger.info( + f"SCHEDULED adding notification {notification.id} to batch" + ) + batch.append(notification) try: + current_app.logger.info("GOING TO DO BATCH INSERT") dao_batch_insert_notifications(batch) except Exception as e: + current_app.logger.exception(f"Notification batch insert failed {e}") + for msg in batch: redis_store.rpush("notification_queue", json.dumps(msg)) - current_app.logger.exception(f"Notification batch insert failed {e}") diff --git a/app/dao/notifications_dao.py b/app/dao/notifications_dao.py index cd3c0e1aa..92dcc234c 100644 --- a/app/dao/notifications_dao.py +++ b/app/dao/notifications_dao.py @@ -803,9 +803,11 @@ def dao_close_out_delivery_receipts(): def dao_batch_insert_notifications(batch): + current_app.logger.info("DOING BATCH INSERT IN DAO") try: - db.session.bulk_save_objects(Notification(**msg) for msg in batch) + db.session.bulk_save_objects(batch) db.session.commit() + current_app.logger.info(f"SUCCESSFULLY INSERTED: {len(batch)}") return len(batch) except sqlalchemy.exc.SQLAlchemyError as e: current_app.logger.exception(f"Error during batch insert {e}") diff --git a/app/models.py b/app/models.py index fc7b855e4..ff734f8bf 100644 --- a/app/models.py +++ b/app/models.py @@ -5,7 +5,7 @@ from sqlalchemy import CheckConstraint, Index, UniqueConstraint from sqlalchemy.dialects.postgresql import JSON, JSONB, UUID from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.ext.declarative import declared_attr +from sqlalchemy.ext.declarative import DeclarativeMeta, declared_attr from sqlalchemy.orm import validates from sqlalchemy.orm.collections import attribute_mapped_collection @@ -1694,6 +1694,29 @@ def get_created_by_email_address(self): else: return None + def serialize_for_redis(self, obj): + if isinstance(obj.__class__, DeclarativeMeta): + fields = {} + for column in obj.__table__.columns: + if column.name == "notification_status": + new_name = "status" + value = getattr(obj, new_name) + elif column.name == "created_at": + value = (obj.created_at.strftime("%Y-%m-%d %H:%M:%S"),) + elif column.name in ["sent_at", "completed_at"]: + value = None + elif column.name.endswith("_id"): + value = getattr(obj, column.name) + value = str(value) + else: + value = getattr(obj, column.name) + if column.name in ["message_id", "api_key_id"]: + pass # do nothing because we don't have the message id yet + else: + fields[column.name] = value + return fields + raise ValueError("Provided object is not a SQLAlchemy instance") + def serialize_for_csv(self): serialized = { "row_number": ( diff --git a/app/notifications/process_notifications.py b/app/notifications/process_notifications.py index 347d2fc0b..2be547f7a 100644 --- a/app/notifications/process_notifications.py +++ b/app/notifications/process_notifications.py @@ -1,3 +1,4 @@ +import json import uuid from flask import current_app @@ -10,7 +11,7 @@ dao_notification_exists, get_notification_by_id, ) -from app.enums import KeyType, NotificationStatus, NotificationType +from app.enums import NotificationStatus, NotificationType from app.errors import BadRequestError from app.models import Notification from app.utils import hilite, utc_now @@ -140,16 +141,15 @@ def persist_notification( if not simulated: # current_app.logger.info("Firing dao_create_notification") # dao_create_notification(notification) - redis_store.rpush("message_queue", notification) - if key_type != KeyType.TEST and current_app.config["REDIS_ENABLED"]: - current_app.logger.info( - "Redis enabled, querying cache key for service id: {}".format( - service.id - ) - ) + current_app.logger.info( + f"QUEUE LENTGH BEFOE {redis_store.llen("message_queue")}" + ) + redis_store.rpush( + "message_queue", json.dumps(notification.serialize_for_redis(notification)) + ) current_app.logger.info( - f"{notification_type} {notification_id} created at {notification_created_at}" + f"QUEUE LENTGH AFTA {redis_store.llen("message_queue")}" ) return notification diff --git a/notifications_utils/clients/redis/redis_client.py b/notifications_utils/clients/redis/redis_client.py index 3404d27e7..c41318243 100644 --- a/notifications_utils/clients/redis/redis_client.py +++ b/notifications_utils/clients/redis/redis_client.py @@ -38,6 +38,10 @@ class RedisClient: active = False scripts = {} + @classmethod + def pipeline(cls): + return cls.redis_store.pipeline() + def init_app(self, app): self.active = app.config.get("REDIS_ENABLED") if self.active: @@ -160,9 +164,13 @@ def rpush(self, key, value): if self.active: self.redis_store.rpush(key, value) - def lpop(self, key, value): + def lpop(self, key): + if self.active: + return self.redis_store.lpop(key) + + def llen(self, key): if self.active: - self.redis_store.lpop(key, value) + return self.redis_store.llen(key) def delete(self, *keys, raise_exception=False): keys = [prepare_value(k) for k in keys]