diff --git a/packages/indexer-common/src/indexer-management/actions.ts b/packages/indexer-common/src/indexer-management/actions.ts index 3eb56a44b..b0417fd6e 100644 --- a/packages/indexer-common/src/indexer-management/actions.ts +++ b/packages/indexer-common/src/indexer-management/actions.ts @@ -22,7 +22,7 @@ import { } from '@graphprotocol/indexer-common' import { Order, Transaction } from 'sequelize' -import { Eventual, join, Logger } from '@graphprotocol/common-ts' +import { Logger } from '@graphprotocol/common-ts' import groupBy from 'lodash.groupby' export class ActionManager { @@ -31,6 +31,8 @@ export class ActionManager { declare models: IndexerManagementModels declare allocationManagers: NetworkMapped + executeBatchActionsPromise: Promise | undefined + static async create( multiNetworks: MultiNetworks, logger: Logger, @@ -117,91 +119,79 @@ export class ActionManager { async monitorQueue(): Promise { const logger = this.logger.child({ component: 'QueueMonitor' }) - const approvedActions: Eventual = sequentialTimerMap( - { - logger, - milliseconds: 30_000, - }, - async () => { - logger.trace('Fetching approved actions') - let actions: Action[] = [] - try { - actions = await ActionManager.fetchActions(this.models, null, { - status: ActionStatus.APPROVED, - }) - logger.trace(`Fetched ${actions.length} approved actions`) - } catch (err) { - logger.warn('Failed to fetch approved actions from queue', { err }) - } + sequentialTimerMap({ logger, milliseconds: 30_000 }, this.processQueue) + } - return actions - }, - { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - onError: (err: any) => - logger.warn('Failed to fetch approved actions from queue', { err }), - }, + private async processQueue(): Promise { + const logger = this.logger.child({ component: 'ActionManager.processQueue' }) + logger.trace('Fetching approved actions') + let approvedActions: Action[] = [] + try { + approvedActions = await ActionManager.fetchActions(this.models, null, { + status: ActionStatus.APPROVED, + }) + logger.trace(`Fetched ${approvedActions.length} approved actions`) + } catch (err) { + logger.warn('Failed to fetch approved actions from queue', { err }) + } + + logger.debug('Approved actions found, evaluating batch') + const approvedActionsByNetwork: NetworkMapped = groupBy( + approvedActions, + (action: Action) => action.protocolNetwork, ) - join({ approvedActions }).pipe(async ({ approvedActions }) => { - logger.debug('Approved actions found, evaluating batch') - const approvedActionsByNetwork: NetworkMapped = groupBy( - approvedActions, - (action: Action) => action.protocolNetwork, - ) + await this.multiNetworks.mapNetworkMapped( + approvedActionsByNetwork, + async (network: Network, approvedActions: Action[]) => { + const networkLogger = logger.child({ + protocolNetwork: network.specification.networkIdentifier, + indexer: network.specification.indexerOptions.address, + operator: network.transactionManager.wallet.address, + }) - await this.multiNetworks.mapNetworkMapped( - approvedActionsByNetwork, - async (network: Network, approvedActions: Action[]) => { - const networkLogger = logger.child({ + if (await this.batchReady(approvedActions, network, networkLogger)) { + const paused = await network.paused.value() + const isOperator = await network.isOperator.value() + networkLogger.debug('Batch ready, preparing to execute', { + paused, + isOperator, protocolNetwork: network.specification.networkIdentifier, - indexer: network.specification.indexerOptions.address, - operator: network.transactionManager.wallet.address, }) + // Do nothing else if the network is paused + if (paused) { + networkLogger.info( + `The network is currently paused, not doing anything until it resumes`, + ) + return + } - if (await this.batchReady(approvedActions, network, networkLogger)) { - const paused = await network.paused.value() - const isOperator = await network.isOperator.value() - networkLogger.debug('Batch ready, preparing to execute', { - paused, - isOperator, - protocolNetwork: network.specification.networkIdentifier, - }) - // Do nothing else if the network is paused - if (paused) { - networkLogger.info( - `The network is currently paused, not doing anything until it resumes`, - ) - return - } - - // Do nothing if we're not authorized as an operator for the indexer - if (!isOperator) { - networkLogger.error(`Not authorized as an operator for the indexer`, { - err: indexerError(IndexerErrorCode.IE034), - }) - return - } - - networkLogger.info('Executing batch of approved actions', { - actions: approvedActions, - note: 'If actions were approved very recently they may be missing from this batch', + // Do nothing if we're not authorized as an operator for the indexer + if (!isOperator) { + networkLogger.error(`Not authorized as an operator for the indexer`, { + err: indexerError(IndexerErrorCode.IE034), }) + return + } - try { - const attemptedActions = await this.executeApprovedActions(network) - networkLogger.trace('Attempted to execute all approved actions', { - actions: attemptedActions, - }) - } catch (error) { - networkLogger.error('Failed to execute batch of approved actions', { - error, - }) - } + networkLogger.info('Executing batch of approved actions', { + actions: approvedActions, + note: 'If actions were approved very recently they may be missing from this batch', + }) + + try { + const attemptedActions = await this.executeApprovedActions(network) + networkLogger.trace('Attempted to execute all approved actions', { + actions: attemptedActions, + }) + } catch (error) { + networkLogger.error('Failed to execute batch of approved actions', { + error, + }) } - }, - ) - }) + } + }, + ) } private async updateActionStatuses( @@ -228,7 +218,19 @@ export class ActionManager { return updatedActions } + // a promise guard to ensure that only one batch of actions is executed at a time async executeApprovedActions(network: Network): Promise { + if (this.executeBatchActionsPromise) { + this.logger.warn('Previous batch action execution is still in progress') + return this.executeBatchActionsPromise + } + this.executeBatchActionsPromise = this.executeApprovedActions(network) + const updatedActions = await this.executeBatchActionsPromise + this.executeBatchActionsPromise = undefined + return updatedActions + } + + async executeApprovedActionsInner(network: Network): Promise { let updatedActions: Action[] = [] const protocolNetwork = network.specification.networkIdentifier const logger = this.logger.child({ @@ -236,11 +238,12 @@ export class ActionManager { protocolNetwork, }) - logger.trace('Begin database transaction for executing approved actions') + logger.debug('Begin database transaction for executing approved actions') // eslint-disable-next-line @typescript-eslint/no-non-null-assertion await this.models.Action.sequelize!.transaction( { isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE }, async (transaction) => { + const transactionOpenTime = Date.now() let approvedActions try { // Execute already approved actions in the order of type and priority. @@ -276,6 +279,11 @@ export class ActionManager { return [] } try { + logger.debug('Executing batch action', { + approvedActions, + startTimeMs: Date.now() - transactionOpenTime, + }) + // This will return all results if successful, if failed it will return the failed actions const allocationManager = this.allocationManagers[network.specification.networkIdentifier] @@ -283,15 +291,21 @@ export class ActionManager { logger.debug('Completed batch action execution', { results, + endTimeMs: Date.now() - transactionOpenTime, }) updatedActions = await this.updateActionStatuses(results, transaction) + + logger.debug('Updated action statuses', { + updatedActions, + updatedTimeMs: Date.now() - transactionOpenTime, + }) } catch (error) { logger.error(`Failed to execute batch tx on staking contract: ${error}`) throw indexerError(IndexerErrorCode.IE072, error) } }, ) - logger.trace('End database transaction for executing approved actions') + logger.debug('End database transaction for executing approved actions') return updatedActions } diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 797c54476..f556224b3 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -116,7 +116,7 @@ export class AllocationManager { return await this.confirmTransactions(result, actions) } - async executeTransactions(actions: Action[]): Promise { + private async executeTransactions(actions: Action[]): Promise { const logger = this.logger.child({ function: 'executeTransactions' }) logger.trace('Begin executing transactions', { actions }) if (actions.length < 1) { diff --git a/packages/indexer-common/src/sequential-timer.ts b/packages/indexer-common/src/sequential-timer.ts index 9a50d4835..6d43a94af 100644 --- a/packages/indexer-common/src/sequential-timer.ts +++ b/packages/indexer-common/src/sequential-timer.ts @@ -20,7 +20,6 @@ export interface TimerTaskContext { function logWorkTime( workStarted: number, logger: Logger, - loopTime: number, caller: string | undefined, milliseconds: number, ) { @@ -28,9 +27,10 @@ function logWorkTime( const workTime = Date.now() - workStarted if (workTime > milliseconds + workTimeWarningThreshold) { logger.warn( - 'timer work took longer than the sequential timer was configured for (>5s)', + `timer work took ${ + (workTime - milliseconds) / 1000 + }s longer than expected, next execution in ${milliseconds / 1000}s`, { - loopTime, workTime, milliseconds, caller, @@ -57,7 +57,6 @@ export function sequentialTimerReduce( // obtain the calling method name from the call stack const stack = new Error().stack const caller = stack?.split('\n')[2].trim() - let lastWorkStarted = Date.now() let acc: U = initial let previousT: T | undefined @@ -74,26 +73,23 @@ export function sequentialTimerReduce( function work() { const workStarted = Date.now() const promiseOrT = reducer(acc, workStarted) - const loopTime = workStarted - lastWorkStarted - - lastWorkStarted = workStarted if (isPromiseLike(promiseOrT)) { promiseOrT.then( function onfulfilled(value) { outputReduce(value) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, function onrejected(err) { console.error(err) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, ) } else { outputReduce(promiseOrT) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) } } // initial call @@ -118,8 +114,6 @@ export function sequentialTimerMap( // obtain the calling method name from the call stack const stack = new Error().stack const caller = stack?.split('\n')[2].trim() - let lastWorkStarted = Date.now() - const output = mutable() let latestU: U | undefined @@ -135,27 +129,25 @@ export function sequentialTimerMap( function work() { const workStarted = Date.now() const promiseOrU = mapper(workStarted) - const loopTime = workStarted - lastWorkStarted - lastWorkStarted = workStarted if (isPromiseLike(promiseOrU)) { promiseOrU.then( function onfulfilled(value) { checkMappedValue(value) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, function onrejected(err) { options?.onError(err) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) }, ) } else { // resolved value checkMappedValue(promiseOrU) - logWorkTime(workStarted, logger, loopTime, caller, milliseconds) - setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted))) + logWorkTime(workStarted, logger, caller, milliseconds) + setTimeout(work, milliseconds) } }