Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Transfer ETL Updates - 1629 1635 1639 #1657

Merged
merged 8 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions backend/lcfs/db/migrations/versions/2025-01-10-13-39_d25e7c47659e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""mv update on org balances

Revision ID: d25e7c47659e
Revises: fa98709e7952
Create Date: 2025-01-10 13:39:31.688471
"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "d25e7c47659e"
down_revision = "fa98709e7952"
branch_labels = None
depends_on = None


def upgrade() -> None:
# Create or replace the function with updated logic:
# 1) total_balance now sums:
# - All compliance_units from 'Adjustment'
# - Negative compliance_units from 'Reserved'
# 2) reserved_balance sums only negative compliance_units from 'Reserved'
op.execute(
"""
CREATE OR REPLACE FUNCTION update_organization_balance()
RETURNS TRIGGER AS $$
DECLARE
new_total_balance BIGINT;
new_reserved_balance BIGINT;
org_id INT := COALESCE(NEW.organization_id, OLD.organization_id);
BEGIN
-- Calculate new total_balance:
-- adjustments + negative reserved units
SELECT COALESCE(
SUM(
CASE
WHEN transaction_action = 'Adjustment' THEN compliance_units
WHEN transaction_action = 'Reserved' AND compliance_units < 0 THEN compliance_units
ELSE 0
END
),
0
)
INTO new_total_balance
FROM "transaction"
WHERE organization_id = org_id;

-- Calculate new reserved_balance from negative compliance_units
SELECT COALESCE(SUM(compliance_units), 0)
INTO new_reserved_balance
FROM "transaction"
WHERE organization_id = org_id
AND transaction_action = 'Reserved'
AND compliance_units < 0;

UPDATE organization
SET total_balance = new_total_balance,
reserved_balance = new_reserved_balance
WHERE organization_id = org_id;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
)

op.execute(
"""
DROP TRIGGER IF EXISTS update_organization_balance_trigger ON "transaction";
"""
)
op.execute(
"""
CREATE TRIGGER update_organization_balance_trigger
AFTER INSERT OR UPDATE OR DELETE ON "transaction"
FOR EACH ROW EXECUTE FUNCTION update_organization_balance();
"""
)


def downgrade() -> None:
# Revert to the original logic:
# 1) total_balance sums only 'Adjustment'
# 2) reserved_balance sums all (positive and negative) 'Reserved'
op.execute(
"""
CREATE OR REPLACE FUNCTION update_organization_balance()
RETURNS TRIGGER AS $$
DECLARE
new_total_balance BIGINT;
new_reserved_balance BIGINT;
org_id INT := COALESCE(NEW.organization_id, OLD.organization_id);
BEGIN
SELECT COALESCE(SUM(compliance_units), 0)
INTO new_total_balance
FROM "transaction"
WHERE organization_id = org_id
AND transaction_action = 'Adjustment';

SELECT COALESCE(SUM(compliance_units), 0)
INTO new_reserved_balance
FROM "transaction"
WHERE organization_id = org_id
AND transaction_action = 'Reserved';

UPDATE organization
SET total_balance = new_total_balance,
reserved_balance = new_reserved_balance
WHERE organization_id = org_id;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;
"""
)

op.execute(
"""
DROP TRIGGER IF EXISTS update_organization_balance_trigger ON "transaction";
"""
)
op.execute(
"""
CREATE TRIGGER update_organization_balance_trigger
AFTER INSERT OR UPDATE OR DELETE ON "transaction"
FOR EACH ROW EXECUTE FUNCTION update_organization_balance();
"""
)
8 changes: 4 additions & 4 deletions backend/lcfs/tests/organizations/test_organizations_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ async def test_get_organizations_paginated_balances_with_reserved_transactions(
BASE_TOTAL_BALANCE + 100
), f"Expected total balance to be 100, got {org.total_balance}"
assert (
org.reserved_balance == 30
), f"Expected reserved balance to be 30, got {org.reserved_balance}"
org.reserved_balance == 0
), f"Expected reserved balance to be 0, got {org.reserved_balance}"


@pytest.mark.anyio
Expand Down Expand Up @@ -142,8 +142,8 @@ async def test_get_organizations_paginated_balances_with_released_transactions(
org.total_balance == 51100
), f"Expected total balance to be 100, got {org.total_balance}"
assert (
org.reserved_balance == 10
), f"Expected reserved balance to be 10, got {org.reserved_balance}"
org.reserved_balance == 0
), f"Expected reserved balance to be 0, got {org.reserved_balance}"


@pytest.mark.anyio
Expand Down
6 changes: 6 additions & 0 deletions backend/lcfs/web/api/transfer/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ async def get_transfer_by_id(self, transfer_id: int) -> Transfer:
"""
Queries the database for a transfer by its ID and returns the ORM model.
Eagerly loads related entities to prevent lazy loading issues.
Orders the transfer history by create_date.
"""
query = (
select(Transfer)
Expand All @@ -88,6 +89,11 @@ async def get_transfer_by_id(self, transfer_id: int) -> Transfer:

result = await self.db.execute(query)
transfer = result.scalars().first()

# Ensure transfer_history is ordered by create_date
if transfer and transfer.transfer_history:
transfer.transfer_history.sort(key=lambda history: history.create_date)

return transfer

@repo_handler
Expand Down
2 changes: 1 addition & 1 deletion backend/lcfs/web/api/transfer/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TransferSchema(BaseSchema):
transfer_id: int
from_organization: TransferOrganizationSchema
to_organization: TransferOrganizationSchema
agreement_date: date
agreement_date: Optional[date] = None
quantity: int
price_per_unit: float
comments: Optional[List[TransferCommentSchema]] = None
Expand Down
31 changes: 18 additions & 13 deletions etl/nifi_scripts/transfer.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,7 @@ def SOURCE_QUERY = """
WHEN cts.status = 'Refused' THEN 'Declined'
WHEN cts.status = 'Submitted' THEN 'Sent'
ELSE cts.status
END AS current_status,
CASE
WHEN cts.status = 'Not Recommended' THEN 'Refuse'
WHEN cts.status = 'Recommended' THEN 'Record'
ELSE NULL
END AS recommendation
END AS current_status
FROM
credit_trade ct
JOIN credit_trade_type ctt ON ct.type_id = ctt.id
Expand Down Expand Up @@ -203,13 +198,21 @@ try {
continue
}

// Identify if the history ever contained "Recommended" or "Not Recommended"
def recommendationValue = null
if (creditTradeHistoryJson.any { it.transfer_status == 'Recommended' }) {
recommendationValue = 'Record' // matches "Record" in the transfer_recommendation_enum
} else if (creditTradeHistoryJson.any { it.transfer_status == 'Not Recommended' }) {
recommendationValue = 'Refuse' // matches "Refuse" in the transfer_recommendation_enum
}

// Only if transfer does not exist, proceed to create transactions and then insert the transfer.
def (fromTransactionId, toTransactionId) = processTransactions(resultSet.getString('current_status'),
resultSet,
statements.transactionStmt)

def transferId = insertTransfer(resultSet, statements.transferStmt,
fromTransactionId, toTransactionId, preparedData, destinationConn)
fromTransactionId, toTransactionId, preparedData, destinationConn, recommendationValue)

if (transferId) {
processHistory(transferId, creditTradeHistoryJson, statements.historyStmt, preparedData)
Expand Down Expand Up @@ -398,23 +401,26 @@ def transferExists(Connection conn, int transferId) {
def processHistory(Integer transferId, List creditTradeHistory, PreparedStatement historyStmt, Map preparedData) {
if (!creditTradeHistory) return

// Sort the records by create_timestamp to preserve chronological order
def sortedHistory = creditTradeHistory.sort { a, b ->
toSqlTimestamp(a.create_timestamp ?: '2013-01-01T00:00:00Z') <=> toSqlTimestamp(b.create_timestamp ?: '2013-01-01T00:00:00Z')
}

// Use a Set to track unique combinations of transfer_id and transfer_status
def processedEntries = new HashSet<String>()

creditTradeHistory.each { historyItem ->
sortedHistory.each { historyItem ->
try {
def statusId = getStatusId(historyItem.transfer_status, preparedData)
def uniqueKey = "${transferId}_${statusId}"

// Check if this combination has already been processed
if (!processedEntries.contains(uniqueKey)) {
// If not processed, add to batch and mark as processed
historyStmt.setInt(1, transferId)
historyStmt.setInt(2, statusId)
historyStmt.setInt(3, historyItem.user_profile_id)
historyStmt.setTimestamp(4, toSqlTimestamp(historyItem.create_timestamp ?: '2013-01-01T00:00:00Z'))
historyStmt.addBatch()

processedEntries.add(uniqueKey)
}
} catch (Exception e) {
Expand All @@ -426,7 +432,6 @@ def processHistory(Integer transferId, List creditTradeHistory, PreparedStatemen
historyStmt.executeBatch()
}


def processInternalComments(Integer transferId, List internalComments,
PreparedStatement internalCommentStmt,
PreparedStatement transferInternalCommentStmt) {
Expand Down Expand Up @@ -475,7 +480,7 @@ def getAudienceScope(String roleNames) {
}

def insertTransfer(ResultSet rs, PreparedStatement transferStmt, Long fromTransactionId,
Long toTransactionId, Map preparedData, Connection conn) {
Long toTransactionId, Map preparedData, Connection conn, String recommendationValue) {
// Check for duplicates in the `transfer` table
def transferId = rs.getInt('transfer_id')
def duplicateCheckStmt = conn.prepareStatement('SELECT COUNT(*) FROM transfer WHERE transfer_id = ?')
Expand Down Expand Up @@ -507,7 +512,7 @@ def insertTransfer(ResultSet rs, PreparedStatement transferStmt, Long fromTransa
transferStmt.setString(11, rs.getString('gov_comment'))
transferStmt.setObject(12, categoryId)
transferStmt.setObject(13, statusId)
transferStmt.setString(14, rs.getString('recommendation'))
transferStmt.setString(14, recommendationValue)
transferStmt.setTimestamp(15, rs.getTimestamp('create_date'))
transferStmt.setTimestamp(16, rs.getTimestamp('update_date'))
transferStmt.setInt(17, rs.getInt('create_user'))
Expand Down