Skip to content

Commit

Permalink
cleanup redis commands and flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Kenneth Kehl committed Jan 10, 2025
1 parent bbf5bac commit 64a61f5
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 18 deletions.
36 changes: 31 additions & 5 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.enums import JobStatus, NotificationType
from app.models import Job
from app.models import Job, Notification
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
Expand Down Expand Up @@ -292,13 +292,39 @@ def cleanup_delivery_receipts(self):

@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
current_app.logger.info("ENTER SCHEDULED TASK")
batch = []
with redis_store.pipeline:
notification = redis_store.lpop("notification_queue")
batch.append(json.loads(notification))
# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
with redis_store.pipeline():
current_app.logger.info("PIPELINE")
# since this list is always growing, just grab what is available when
# this call is made and process that.
current_len = redis_store.llen("message_queue")
count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
notification_dict["created_at"] = utc_now()
notification = Notification(**notification_dict)
current_app.logger.info(
f"WHAT IS THIS NOTIFICATION {type(notification)} {notification}"
)
if notification is not None:
current_app.logger.info(
f"SCHEDULED adding notification {notification.id} to batch"
)
batch.append(notification)
try:
current_app.logger.info("GOING TO DO BATCH INSERT")
dao_batch_insert_notifications(batch)
except Exception as e:
current_app.logger.exception(f"Notification batch insert failed {e}")

for msg in batch:
redis_store.rpush("notification_queue", json.dumps(msg))
current_app.logger.exception(f"Notification batch insert failed {e}")
4 changes: 3 additions & 1 deletion app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,9 +803,11 @@ def dao_close_out_delivery_receipts():


def dao_batch_insert_notifications(batch):
current_app.logger.info("DOING BATCH INSERT IN DAO")
try:
db.session.bulk_save_objects(Notification(**msg) for msg in batch)
db.session.bulk_save_objects(batch)
db.session.commit()
current_app.logger.info(f"SUCCESSFULLY INSERTED: {len(batch)}")
return len(batch)
except sqlalchemy.exc.SQLAlchemyError as e:
current_app.logger.exception(f"Error during batch insert {e}")
25 changes: 24 additions & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import CheckConstraint, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSON, JSONB, UUID
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.declarative import DeclarativeMeta, declared_attr
from sqlalchemy.orm import validates
from sqlalchemy.orm.collections import attribute_mapped_collection

Expand Down Expand Up @@ -1694,6 +1694,29 @@ def get_created_by_email_address(self):
else:
return None

def serialize_for_redis(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
fields = {}
for column in obj.__table__.columns:
if column.name == "notification_status":
new_name = "status"
value = getattr(obj, new_name)
elif column.name == "created_at":
value = (obj.created_at.strftime("%Y-%m-%d %H:%M:%S"),)
elif column.name in ["sent_at", "completed_at"]:
value = None
elif column.name.endswith("_id"):
value = getattr(obj, column.name)
value = str(value)
else:
value = getattr(obj, column.name)
if column.name in ["message_id", "api_key_id"]:
pass # do nothing because we don't have the message id yet
else:
fields[column.name] = value
return fields
raise ValueError("Provided object is not a SQLAlchemy instance")

def serialize_for_csv(self):
serialized = {
"row_number": (
Expand Down
18 changes: 9 additions & 9 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import uuid

from flask import current_app
Expand All @@ -10,7 +11,7 @@
dao_notification_exists,
get_notification_by_id,
)
from app.enums import KeyType, NotificationStatus, NotificationType
from app.enums import NotificationStatus, NotificationType
from app.errors import BadRequestError
from app.models import Notification
from app.utils import hilite, utc_now
Expand Down Expand Up @@ -140,16 +141,15 @@ def persist_notification(
if not simulated:
# current_app.logger.info("Firing dao_create_notification")
# dao_create_notification(notification)
redis_store.rpush("message_queue", notification)
if key_type != KeyType.TEST and current_app.config["REDIS_ENABLED"]:
current_app.logger.info(
"Redis enabled, querying cache key for service id: {}".format(
service.id
)
)
current_app.logger.info(
f"QUEUE LENTGH BEFOE {redis_store.llen("message_queue")}"
)
redis_store.rpush(
"message_queue", json.dumps(notification.serialize_for_redis(notification))
)

current_app.logger.info(
f"{notification_type} {notification_id} created at {notification_created_at}"
f"QUEUE LENTGH AFTA {redis_store.llen("message_queue")}"
)
return notification

Expand Down
12 changes: 10 additions & 2 deletions notifications_utils/clients/redis/redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class RedisClient:
active = False
scripts = {}

@classmethod
def pipeline(cls):
return cls.redis_store.pipeline()

def init_app(self, app):
self.active = app.config.get("REDIS_ENABLED")
if self.active:
Expand Down Expand Up @@ -160,9 +164,13 @@ def rpush(self, key, value):
if self.active:
self.redis_store.rpush(key, value)

def lpop(self, key, value):
def lpop(self, key):
if self.active:
return self.redis_store.lpop(key)

def llen(self, key):
if self.active:
self.redis_store.lpop(key, value)
return self.redis_store.llen(key)

def delete(self, *keys, raise_exception=False):
keys = [prepare_value(k) for k in keys]
Expand Down

0 comments on commit 64a61f5

Please sign in to comment.