Skip to content

Commit

Permalink
common: mark actions as PENDING while executing
Browse files Browse the repository at this point in the history
  • Loading branch information
dwerner committed Jan 9, 2025
1 parent 799cd4d commit ac636ef
Showing 1 changed file with 112 additions and 33 deletions.
145 changes: 112 additions & 33 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,44 @@ export class ActionManager {
})
}

private async updateActionStatuses(
/**
* Mark actions with the given status.
* @param actions
* @param transaction
* @param status
* @returns updated actions
* @throws error if the update fails
*/
private async markActions(
actions: Action[],
transaction: Transaction,
status: ActionStatus,
): Promise<Action[]> {
const updatedActions: Action[] = []
for (const result of actions) {
const [, updatedAction] = await this.models.Action.update(
{
status,
},
{
where: { id: result.id },
returning: true,
transaction,
},
)
updatedActions.concat(updatedAction)
}
return updatedActions
}

/**
* Update the action statuses from the results provided by execution.
*
* @param results
* @param transaction
* @returns updated actions
*/
private async updateActionStatusesWithResults(
results: AllocationResult[],
transaction: Transaction,
): Promise<Action[]> {
Expand Down Expand Up @@ -255,12 +292,14 @@ export class ActionManager {
protocolNetwork,
})

logger.debug('Begin database transaction for executing approved actions')
logger.debug('Begin executing approved actions')
let batchStartTime

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await this.models.Action.sequelize!.transaction(
const prioritizedActions: Action[] = await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
const transactionOpenTime = Date.now()
batchStartTime = Date.now()
let approvedActions
try {
// Execute already approved actions in the order of type and priority.
Expand All @@ -271,10 +310,7 @@ export class ActionManager {
const actionTypePriority = ['unallocate', 'reallocate', 'allocate']
approvedActions = (
await this.models.Action.findAll({
where: {
status: ActionStatus.APPROVED,
protocolNetwork,
},
where: { status: ActionStatus.APPROVED, protocolNetwork },
order: [['priority', 'ASC']],
transaction,
lock: transaction.LOCK.UPDATE,
Expand All @@ -283,6 +319,16 @@ export class ActionManager {
return actionTypePriority.indexOf(a.type) - actionTypePriority.indexOf(b.type)
})

const pendingActions = await this.models.Action.findAll({
where: { status: ActionStatus.PENDING, protocolNetwork },
order: [['priority', 'ASC']],
transaction,
})
if (pendingActions.length > 0) {
logger.warn(`${pendingActions} Actions found in PENDING state when execution began. Was there a crash? \
These indicate that execution was interrupted and will need to be cleared manually.`)
}

if (approvedActions.length === 0) {
logger.debug('No approved actions were found for this network')
return []
Expand All @@ -295,34 +341,67 @@ export class ActionManager {
logger.error('Failed to query approved actions for network', { error })
return []
}
try {
logger.debug('Executing batch action', {
approvedActions,
startTimeMs: Date.now() - transactionOpenTime,
})
// mark all approved actions as PENDING, this serves as a lock on other processing of them
await this.markActions(approvedActions, transaction, ActionStatus.PENDING)
return prioritizedActions
},
)

// This will return all results if successful, if failed it will return the failed actions
const allocationManager =
this.allocationManagers[network.specification.networkIdentifier]
const results = await allocationManager.executeBatch(approvedActions)
try {
logger.debug('Executing batch action', {
prioritizedActions,
startTimeMs: Date.now() - batchStartTime,
})

const allocationManager =
this.allocationManagers[network.specification.networkIdentifier]

let results
try {
// This will return all results if successful, if failed it will return the failed actions
results = await allocationManager.executeBatch(prioritizedActions)
logger.debug('Completed batch action execution', {
results,
endTimeMs: Date.now() - batchStartTime,
})
} catch (error) {
// Release the actions from the PENDING state. This means they will be retried again on the next batch execution.
logger.error(
`Error raised during executeBatch, releasing ${prioritizedActions.length} actions from PENDING state. \
These will be attempted again on the next batch.`,
error,
)
await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
return await this.markActions(
prioritizedActions,
transaction,
ActionStatus.APPROVED,
)
},
)
return []
}

logger.debug('Completed batch action execution', {
results,
endTimeMs: Date.now() - transactionOpenTime,
})
updatedActions = await this.updateActionStatuses(results, transaction)
// Happy path: execution went well (success or failure but no exceptions). Update the actions with the results.
updatedActions = await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
return await this.updateActionStatusesWithResults(results, transaction)
},
)

logger.debug('Updated action statuses', {
updatedActions,
updatedTimeMs: Date.now() - transactionOpenTime,
})
} catch (error) {
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
throw indexerError(IndexerErrorCode.IE072, error)
}
},
)
logger.debug('End database transaction for executing approved actions')
logger.debug('Updated action statuses', {
updatedActions,
updatedTimeMs: Date.now() - batchStartTime,
})
} catch (error) {
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
throw indexerError(IndexerErrorCode.IE072, error)
}

logger.debug('End executing approved actions')
return updatedActions
}

Expand Down

0 comments on commit ac636ef

Please sign in to comment.