diff --git a/backend/lcfs/db/migrations/versions/2025-01-10-13-39_d25e7c47659e.py b/backend/lcfs/db/migrations/versions/2025-01-10-13-39_d25e7c47659e.py new file mode 100644 index 000000000..54a2bef13 --- /dev/null +++ b/backend/lcfs/db/migrations/versions/2025-01-10-13-39_d25e7c47659e.py @@ -0,0 +1,129 @@ +"""mv update on org balances + +Revision ID: d25e7c47659e +Revises: fa98709e7952 +Create Date: 2025-01-10 13:39:31.688471 +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "d25e7c47659e" +down_revision = "fa98709e7952" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Create or replace the function with updated logic: + # 1) total_balance now sums: + # - All compliance_units from 'Adjustment' + # - Negative compliance_units from 'Reserved' + # 2) reserved_balance sums only negative compliance_units from 'Reserved' + op.execute( + """ + CREATE OR REPLACE FUNCTION update_organization_balance() + RETURNS TRIGGER AS $$ + DECLARE + new_total_balance BIGINT; + new_reserved_balance BIGINT; + org_id INT := COALESCE(NEW.organization_id, OLD.organization_id); + BEGIN + -- Calculate new total_balance: + -- adjustments + negative reserved units + SELECT COALESCE( + SUM( + CASE + WHEN transaction_action = 'Adjustment' THEN compliance_units + WHEN transaction_action = 'Reserved' AND compliance_units < 0 THEN compliance_units + ELSE 0 + END + ), + 0 + ) + INTO new_total_balance + FROM "transaction" + WHERE organization_id = org_id; + + -- Calculate new reserved_balance from negative compliance_units + SELECT COALESCE(SUM(compliance_units), 0) + INTO new_reserved_balance + FROM "transaction" + WHERE organization_id = org_id + AND transaction_action = 'Reserved' + AND compliance_units < 0; + + UPDATE organization + SET total_balance = new_total_balance, + reserved_balance = new_reserved_balance + WHERE organization_id = org_id; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """ + ) + + op.execute( + """ + DROP TRIGGER IF EXISTS update_organization_balance_trigger ON "transaction"; + """ + ) + op.execute( + """ + CREATE TRIGGER update_organization_balance_trigger + AFTER INSERT OR UPDATE OR DELETE ON "transaction" + FOR EACH ROW EXECUTE FUNCTION update_organization_balance(); + """ + ) + + +def downgrade() -> None: + # Revert to the original logic: + # 1) total_balance sums only 'Adjustment' + # 2) reserved_balance sums all (positive and negative) 'Reserved' + op.execute( + """ + CREATE OR REPLACE FUNCTION update_organization_balance() + RETURNS TRIGGER AS $$ + DECLARE + new_total_balance BIGINT; + new_reserved_balance BIGINT; + org_id INT := COALESCE(NEW.organization_id, OLD.organization_id); + BEGIN + SELECT COALESCE(SUM(compliance_units), 0) + INTO new_total_balance + FROM "transaction" + WHERE organization_id = org_id + AND transaction_action = 'Adjustment'; + + SELECT COALESCE(SUM(compliance_units), 0) + INTO new_reserved_balance + FROM "transaction" + WHERE organization_id = org_id + AND transaction_action = 'Reserved'; + + UPDATE organization + SET total_balance = new_total_balance, + reserved_balance = new_reserved_balance + WHERE organization_id = org_id; + + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """ + ) + + op.execute( + """ + DROP TRIGGER IF EXISTS update_organization_balance_trigger ON "transaction"; + """ + ) + op.execute( + """ + CREATE TRIGGER update_organization_balance_trigger + AFTER INSERT OR UPDATE OR DELETE ON "transaction" + FOR EACH ROW EXECUTE FUNCTION update_organization_balance(); + """ + ) diff --git a/backend/lcfs/tests/organizations/test_organizations_repo.py b/backend/lcfs/tests/organizations/test_organizations_repo.py index d40217c99..cc0cd9c43 100644 --- a/backend/lcfs/tests/organizations/test_organizations_repo.py +++ b/backend/lcfs/tests/organizations/test_organizations_repo.py @@ -104,8 +104,8 @@ async def test_get_organizations_paginated_balances_with_reserved_transactions( BASE_TOTAL_BALANCE + 100 ), f"Expected total balance to be 100, got {org.total_balance}" assert ( - org.reserved_balance == 30 - ), f"Expected reserved balance to be 30, got {org.reserved_balance}" + org.reserved_balance == 0 + ), f"Expected reserved balance to be 0, got {org.reserved_balance}" @pytest.mark.anyio @@ -142,8 +142,8 @@ async def test_get_organizations_paginated_balances_with_released_transactions( org.total_balance == 51100 ), f"Expected total balance to be 100, got {org.total_balance}" assert ( - org.reserved_balance == 10 - ), f"Expected reserved balance to be 10, got {org.reserved_balance}" + org.reserved_balance == 0 + ), f"Expected reserved balance to be 0, got {org.reserved_balance}" @pytest.mark.anyio diff --git a/backend/lcfs/web/api/transfer/repo.py b/backend/lcfs/web/api/transfer/repo.py index b7274f31b..cd48dc0bc 100644 --- a/backend/lcfs/web/api/transfer/repo.py +++ b/backend/lcfs/web/api/transfer/repo.py @@ -66,6 +66,7 @@ async def get_transfer_by_id(self, transfer_id: int) -> Transfer: """ Queries the database for a transfer by its ID and returns the ORM model. Eagerly loads related entities to prevent lazy loading issues. + Orders the transfer history by create_date. """ query = ( select(Transfer) @@ -88,6 +89,11 @@ async def get_transfer_by_id(self, transfer_id: int) -> Transfer: result = await self.db.execute(query) transfer = result.scalars().first() + + # Ensure transfer_history is ordered by create_date + if transfer and transfer.transfer_history: + transfer.transfer_history.sort(key=lambda history: history.create_date) + return transfer @repo_handler diff --git a/backend/lcfs/web/api/transfer/schema.py b/backend/lcfs/web/api/transfer/schema.py index 889437c8a..352b9d9b7 100644 --- a/backend/lcfs/web/api/transfer/schema.py +++ b/backend/lcfs/web/api/transfer/schema.py @@ -47,7 +47,7 @@ class TransferSchema(BaseSchema): transfer_id: int from_organization: TransferOrganizationSchema to_organization: TransferOrganizationSchema - agreement_date: date + agreement_date: Optional[date] = None quantity: int price_per_unit: float comments: Optional[List[TransferCommentSchema]] = None diff --git a/etl/nifi_scripts/transfer.groovy b/etl/nifi_scripts/transfer.groovy index 88cdf9cd9..f3f87cdd8 100644 --- a/etl/nifi_scripts/transfer.groovy +++ b/etl/nifi_scripts/transfer.groovy @@ -89,12 +89,7 @@ def SOURCE_QUERY = """ WHEN cts.status = 'Refused' THEN 'Declined' WHEN cts.status = 'Submitted' THEN 'Sent' ELSE cts.status - END AS current_status, - CASE - WHEN cts.status = 'Not Recommended' THEN 'Refuse' - WHEN cts.status = 'Recommended' THEN 'Record' - ELSE NULL - END AS recommendation + END AS current_status FROM credit_trade ct JOIN credit_trade_type ctt ON ct.type_id = ctt.id @@ -203,13 +198,21 @@ try { continue } + // Identify if the history ever contained "Recommended" or "Not Recommended" + def recommendationValue = null + if (creditTradeHistoryJson.any { it.transfer_status == 'Recommended' }) { + recommendationValue = 'Record' // matches "Record" in the transfer_recommendation_enum + } else if (creditTradeHistoryJson.any { it.transfer_status == 'Not Recommended' }) { + recommendationValue = 'Refuse' // matches "Refuse" in the transfer_recommendation_enum + } + // Only if transfer does not exist, proceed to create transactions and then insert the transfer. def (fromTransactionId, toTransactionId) = processTransactions(resultSet.getString('current_status'), resultSet, statements.transactionStmt) def transferId = insertTransfer(resultSet, statements.transferStmt, - fromTransactionId, toTransactionId, preparedData, destinationConn) + fromTransactionId, toTransactionId, preparedData, destinationConn, recommendationValue) if (transferId) { processHistory(transferId, creditTradeHistoryJson, statements.historyStmt, preparedData) @@ -398,23 +401,26 @@ def transferExists(Connection conn, int transferId) { def processHistory(Integer transferId, List creditTradeHistory, PreparedStatement historyStmt, Map preparedData) { if (!creditTradeHistory) return + // Sort the records by create_timestamp to preserve chronological order + def sortedHistory = creditTradeHistory.sort { a, b -> + toSqlTimestamp(a.create_timestamp ?: '2013-01-01T00:00:00Z') <=> toSqlTimestamp(b.create_timestamp ?: '2013-01-01T00:00:00Z') + } + // Use a Set to track unique combinations of transfer_id and transfer_status def processedEntries = new HashSet() - creditTradeHistory.each { historyItem -> + sortedHistory.each { historyItem -> try { def statusId = getStatusId(historyItem.transfer_status, preparedData) def uniqueKey = "${transferId}_${statusId}" // Check if this combination has already been processed if (!processedEntries.contains(uniqueKey)) { - // If not processed, add to batch and mark as processed historyStmt.setInt(1, transferId) historyStmt.setInt(2, statusId) historyStmt.setInt(3, historyItem.user_profile_id) historyStmt.setTimestamp(4, toSqlTimestamp(historyItem.create_timestamp ?: '2013-01-01T00:00:00Z')) historyStmt.addBatch() - processedEntries.add(uniqueKey) } } catch (Exception e) { @@ -426,7 +432,6 @@ def processHistory(Integer transferId, List creditTradeHistory, PreparedStatemen historyStmt.executeBatch() } - def processInternalComments(Integer transferId, List internalComments, PreparedStatement internalCommentStmt, PreparedStatement transferInternalCommentStmt) { @@ -475,7 +480,7 @@ def getAudienceScope(String roleNames) { } def insertTransfer(ResultSet rs, PreparedStatement transferStmt, Long fromTransactionId, - Long toTransactionId, Map preparedData, Connection conn) { + Long toTransactionId, Map preparedData, Connection conn, String recommendationValue) { // Check for duplicates in the `transfer` table def transferId = rs.getInt('transfer_id') def duplicateCheckStmt = conn.prepareStatement('SELECT COUNT(*) FROM transfer WHERE transfer_id = ?') @@ -507,7 +512,7 @@ def insertTransfer(ResultSet rs, PreparedStatement transferStmt, Long fromTransa transferStmt.setString(11, rs.getString('gov_comment')) transferStmt.setObject(12, categoryId) transferStmt.setObject(13, statusId) - transferStmt.setString(14, rs.getString('recommendation')) + transferStmt.setString(14, recommendationValue) transferStmt.setTimestamp(15, rs.getTimestamp('create_date')) transferStmt.setTimestamp(16, rs.getTimestamp('update_date')) transferStmt.setInt(17, rs.getInt('create_user'))