Skip to content

Commit

Permalink
Merge pull request #1374 from GSA/notify-api-1324
Browse files Browse the repository at this point in the history
refactor miscellaneous daos and utilities
  • Loading branch information
terrazoon authored Oct 30, 2024
2 parents a0e8828 + c0ed18c commit 05e268a
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 65 deletions.
32 changes: 23 additions & 9 deletions app/dao/complaint_dao.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from datetime import timedelta

from flask import current_app
from sqlalchemy import desc
from sqlalchemy import desc, func, select

from app import db
from app.dao.dao_utils import autocommit
from app.dao.inbound_sms_dao import Pagination
from app.models import Complaint
from app.utils import get_midnight_in_utc

Expand All @@ -15,23 +16,36 @@ def save_complaint(complaint):


def fetch_paginated_complaints(page=1):
return Complaint.query.order_by(desc(Complaint.created_at)).paginate(
page=page, per_page=current_app.config["PAGE_SIZE"]
page_size = current_app.config["PAGE_SIZE"]
total_count = db.session.scalar(select(func.count()).select_from(Complaint))
offset = (page - 1) * page_size
stmt = (
select(Complaint)
.order_by(desc(Complaint.created_at))
.offset(offset)
.limit(page_size)
)
result = db.session.execute(stmt).scalars().all()
pagination = Pagination(result, page=page, per_page=page_size, total=total_count)
return pagination


def fetch_complaints_by_service(service_id):
return (
Complaint.query.filter_by(service_id=service_id)
stmt = (
select(Complaint)
.filter_by(service_id=service_id)
.order_by(desc(Complaint.created_at))
.all()
)
return db.session.execute(stmt).scalars().all()


def fetch_count_of_complaints(start_date, end_date):
start_date = get_midnight_in_utc(start_date)
end_date = get_midnight_in_utc(end_date + timedelta(days=1))

return Complaint.query.filter(
Complaint.created_at >= start_date, Complaint.created_at < end_date
).count()
stmt = (
select(func.count())
.select_from(Complaint)
.filter(Complaint.created_at >= start_date, Complaint.created_at < end_date)
)
return db.session.execute(stmt).scalar() or 0
40 changes: 27 additions & 13 deletions app/dao/inbound_numbers_dao.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,30 @@
from sqlalchemy import and_, select, update

from app import db
from app.dao.dao_utils import autocommit
from app.models import InboundNumber


def dao_get_inbound_numbers():
return InboundNumber.query.order_by(InboundNumber.updated_at).all()
stmt = select(InboundNumber).order_by(InboundNumber.updated_at)
return db.session.execute(stmt).scalars().all()


def dao_get_available_inbound_numbers():
return InboundNumber.query.filter(
stmt = select(InboundNumber).filter(
InboundNumber.active, InboundNumber.service_id.is_(None)
).all()
)
return db.session.execute(stmt).scalars().all()


def dao_get_inbound_number_for_service(service_id):
return InboundNumber.query.filter(InboundNumber.service_id == service_id).first()
stmt = select(InboundNumber).filter(InboundNumber.service_id == service_id)
return db.session.execute(stmt).scalars().first()


def dao_get_inbound_number(inbound_number_id):
return InboundNumber.query.filter(InboundNumber.id == inbound_number_id).first()
stmt = select(InboundNumber).filter(InboundNumber.id == inbound_number_id)
return db.session.execute(stmt).scalars().first()


@autocommit
Expand All @@ -29,19 +35,27 @@ def dao_set_inbound_number_to_service(service_id, inbound_number):

@autocommit
def dao_set_inbound_number_active_flag(service_id, active):
inbound_number = InboundNumber.query.filter(
InboundNumber.service_id == service_id
).first()
stmt = select(InboundNumber).filter(InboundNumber.service_id == service_id)
inbound_number = db.session.execute(stmt).scalars().first()
inbound_number.active = active

db.session.add(inbound_number)


@autocommit
def dao_allocate_number_for_service(service_id, inbound_number_id):
updated = InboundNumber.query.filter_by(
id=inbound_number_id, active=True, service_id=None
).update({"service_id": service_id})
if not updated:
stmt = (
update(InboundNumber)
.where(
and_(
InboundNumber.id == inbound_number_id, # noqa
InboundNumber.active == True, # noqa
InboundNumber.service_id == None, # noqa
)
)
.values({"service_id": service_id})
)
result = db.session.execute(stmt)
if result.rowcount == 0:
raise Exception("Inbound number: {} is not available".format(inbound_number_id))
return InboundNumber.query.get(inbound_number_id)
return db.session.get(InboundNumber, inbound_number_id)
97 changes: 66 additions & 31 deletions app/dao/inbound_sms_dao.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from flask import current_app
from sqlalchemy import and_, desc
from sqlalchemy import and_, delete, desc, func, select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import aliased

Expand All @@ -18,8 +18,10 @@ def dao_create_inbound_sms(inbound_sms):
def dao_get_inbound_sms_for_service(
service_id, user_number=None, *, limit_days=None, limit=None
):
q = InboundSms.query.filter(InboundSms.service_id == service_id).order_by(
InboundSms.created_at.desc()
q = (
select(InboundSms)
.filter(InboundSms.service_id == service_id)
.order_by(InboundSms.created_at.desc())
)
if limit_days is not None:
start_date = midnight_n_days_ago(limit_days)
Expand All @@ -31,7 +33,7 @@ def dao_get_inbound_sms_for_service(
if limit:
q = q.limit(limit)

return q.all()
return db.session.execute(q).scalars().all()


def dao_get_paginated_inbound_sms_for_service_for_public_api(
Expand All @@ -46,36 +48,44 @@ def dao_get_paginated_inbound_sms_for_service_for_public_api(
older_than_created_at = (
db.session.query(InboundSms.created_at)
.filter(InboundSms.id == older_than)
.as_scalar()
.scalar_subquery()
)
filters.append(InboundSms.created_at < older_than_created_at)

query = InboundSms.query.filter(*filters)

return (
query.order_by(desc(InboundSms.created_at)).paginate(per_page=page_size).items
)
# As part of the move to sqlalchemy 2.0, we do this manual pagination
query = db.session.query(InboundSms).filter(*filters)
paginated_items = query.order_by(desc(InboundSms.created_at)).limit(page_size).all()
return paginated_items


def dao_count_inbound_sms_for_service(service_id, limit_days):
return InboundSms.query.filter(
InboundSms.service_id == service_id,
InboundSms.created_at >= midnight_n_days_ago(limit_days),
).count()
stmt = (
select(func.count())
.select_from(InboundSms)
.filter(
InboundSms.service_id == service_id,
InboundSms.created_at >= midnight_n_days_ago(limit_days),
)
)
result = db.session.execute(stmt).scalar()
return result


def _insert_inbound_sms_history(subquery, query_limit=10000):
offset = 0
inbound_sms_query = db.session.query(
subquery_select = select(subquery)
inbound_sms_query = select(
InboundSms.id,
InboundSms.created_at,
InboundSms.service_id,
InboundSms.notify_number,
InboundSms.provider_date,
InboundSms.provider_reference,
InboundSms.provider,
).filter(InboundSms.id.in_(subquery))
inbound_sms_count = inbound_sms_query.count()
).where(InboundSms.id.in_(subquery_select))

count_query = select(func.count()).select_from(inbound_sms_query.subquery())
inbound_sms_count = db.session.execute(count_query).scalar() or 0

while offset < inbound_sms_count:
statement = insert(InboundSmsHistory).from_select(
Expand All @@ -86,7 +96,8 @@ def _insert_inbound_sms_history(subquery, query_limit=10000):
statement = statement.on_conflict_do_nothing(
constraint="inbound_sms_history_pkey"
)
db.session.connection().execute(statement)
db.session.execute(statement)
db.session.commit()

offset += query_limit

Expand All @@ -95,7 +106,7 @@ def _delete_inbound_sms(datetime_to_delete_from, query_filter):
query_limit = 10000

subquery = (
db.session.query(InboundSms.id)
select(InboundSms.id)
.filter(InboundSms.created_at < datetime_to_delete_from, *query_filter)
.limit(query_limit)
.subquery()
Expand All @@ -107,9 +118,9 @@ def _delete_inbound_sms(datetime_to_delete_from, query_filter):
while number_deleted > 0:
_insert_inbound_sms_history(subquery, query_limit=query_limit)

number_deleted = InboundSms.query.filter(InboundSms.id.in_(subquery)).delete(
synchronize_session="fetch"
)
stmt = delete(InboundSms).filter(InboundSms.id.in_(subquery))
number_deleted = db.session.execute(stmt).rowcount
db.session.commit()
deleted += number_deleted

return deleted
Expand All @@ -121,11 +132,12 @@ def delete_inbound_sms_older_than_retention():
"Deleting inbound sms for services with flexible data retention"
)

flexible_data_retention = (
ServiceDataRetention.query.join(ServiceDataRetention.service)
stmt = (
select(ServiceDataRetention)
.join(ServiceDataRetention.service)
.filter(ServiceDataRetention.notification_type == NotificationType.SMS)
.all()
)
flexible_data_retention = db.session.execute(stmt).scalars().all()

deleted = 0

Expand Down Expand Up @@ -158,7 +170,8 @@ def delete_inbound_sms_older_than_retention():


def dao_get_inbound_sms_by_id(service_id, inbound_id):
return InboundSms.query.filter_by(id=inbound_id, service_id=service_id).one()
stmt = select(InboundSms).filter_by(id=inbound_id, service_id=service_id)
return db.session.execute(stmt).scalars().one()


def dao_get_paginated_most_recent_inbound_sms_by_user_number_for_service(
Expand All @@ -184,7 +197,7 @@ def dao_get_paginated_most_recent_inbound_sms_by_user_number_for_service(
"""
t2 = aliased(InboundSms)
q = (
db.session.query(InboundSms)
select(InboundSms)
.outerjoin(
t2,
and_(
Expand All @@ -193,12 +206,34 @@ def dao_get_paginated_most_recent_inbound_sms_by_user_number_for_service(
InboundSms.created_at < t2.created_at,
),
)
.filter(
t2.id == None, # noqa
.where(
t2.id.is_(None), # noqa
InboundSms.service_id == service_id,
InboundSms.created_at >= midnight_n_days_ago(limit_days),
)
.order_by(InboundSms.created_at.desc())
)

return q.paginate(page=page, per_page=current_app.config["PAGE_SIZE"])
result = db.session.execute(q).scalars().all()
page_size = current_app.config["PAGE_SIZE"]
offset = (page - 1) * page_size
paginated_results = result[offset : offset + page_size]
pagination = Pagination(paginated_results, page, page_size, len(result))
return pagination


# TODO remove this when billing dao PR is merged.
class Pagination:
def __init__(self, items, page, per_page, total):
self.items = items
self.page = page
self.per_page = per_page
self.total = total
self.pages = (total + per_page - 1) // per_page
self.prev_num = page - 1 if page > 1 else None
self.next_num = page + 1 if page < self.pages else None

def has_next(self):
return self.page < self.pages

def has_prev(self):
return self.page > 1
10 changes: 7 additions & 3 deletions app/inbound_sms/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ def get_most_recent_inbound_sms_for_service(service_id):
results = dao_get_paginated_most_recent_inbound_sms_by_user_number_for_service(
service_id, int(page), limit_days
)
return jsonify(
data=[row.serialize() for row in results.items], has_next=results.has_next
)
try:
x = jsonify(
data=[row.serialize() for row in results.items], has_next=results.has_next()
)
except Exception as e:
raise e
return x


@inbound_sms.route("/summary")
Expand Down
3 changes: 1 addition & 2 deletions tests/app/dao/test_fact_notification_status_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ def test_fetch_notification_status_for_service_by_month(notify_db_session):

assert results[0].month.date() == date(2018, 1, 1)
assert results[0].notification_type == NotificationType.EMAIL
# TODO fix/investigate
# assert results[0].notification_status == NotificationStatus.DELIVERED
assert results[0].notification_status == NotificationStatus.DELIVERED
assert results[0].count == 1

assert results[1].month.date() == date(2018, 1, 1)
Expand Down
5 changes: 4 additions & 1 deletion tests/app/dao/test_inbound_numbers_dao.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import pytest
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError

from app import db
from app.dao.inbound_numbers_dao import (
dao_allocate_number_for_service,
dao_get_available_inbound_numbers,
Expand Down Expand Up @@ -35,7 +37,8 @@ def test_set_service_id_on_inbound_number(notify_db_session, sample_inbound_numb

dao_set_inbound_number_to_service(service.id, numbers[0])

res = InboundNumber.query.filter(InboundNumber.service_id == service.id).all()
stmt = select(InboundNumber).filter(InboundNumber.service_id == service.id)
res = db.session.execute(stmt).scalars().all()

assert len(res) == 1
assert res[0].service_id == service.id
Expand Down
Loading

0 comments on commit 05e268a

Please sign in to comment.