Skip to content

Commit

Permalink
common: make monitorQueue sequential, promise-guard concurrent execut…
Browse files Browse the repository at this point in the history
…eApprovedActions, add logging
  • Loading branch information
dwerner committed Dec 27, 2024
1 parent e37c35b commit db4bbd4
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 102 deletions.
170 changes: 92 additions & 78 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
} from '@graphprotocol/indexer-common'

import { Order, Transaction } from 'sequelize'
import { Eventual, join, Logger } from '@graphprotocol/common-ts'
import { Logger } from '@graphprotocol/common-ts'
import groupBy from 'lodash.groupby'

export class ActionManager {
Expand All @@ -31,6 +31,8 @@ export class ActionManager {
declare models: IndexerManagementModels
declare allocationManagers: NetworkMapped<AllocationManager>

executeBatchActionsPromise: Promise<Action[]> | undefined

static async create(
multiNetworks: MultiNetworks<Network>,
logger: Logger,
Expand Down Expand Up @@ -117,91 +119,79 @@ export class ActionManager {

async monitorQueue(): Promise<void> {
const logger = this.logger.child({ component: 'QueueMonitor' })
const approvedActions: Eventual<Action[]> = sequentialTimerMap(
{
logger,
milliseconds: 30_000,
},
async () => {
logger.trace('Fetching approved actions')
let actions: Action[] = []
try {
actions = await ActionManager.fetchActions(this.models, null, {
status: ActionStatus.APPROVED,
})
logger.trace(`Fetched ${actions.length} approved actions`)
} catch (err) {
logger.warn('Failed to fetch approved actions from queue', { err })
}
sequentialTimerMap({ logger, milliseconds: 30_000 }, this.processQueue)
}

return actions
},
{
// eslint-disable-next-line @typescript-eslint/no-explicit-any
onError: (err: any) =>
logger.warn('Failed to fetch approved actions from queue', { err }),
},
private async processQueue(): Promise<void> {
const logger = this.logger.child({ component: 'ActionManager.processQueue' })
logger.trace('Fetching approved actions')
let approvedActions: Action[] = []
try {
approvedActions = await ActionManager.fetchActions(this.models, null, {
status: ActionStatus.APPROVED,
})
logger.trace(`Fetched ${approvedActions.length} approved actions`)
} catch (err) {
logger.warn('Failed to fetch approved actions from queue', { err })
}

logger.debug('Approved actions found, evaluating batch')
const approvedActionsByNetwork: NetworkMapped<Action[]> = groupBy(
approvedActions,
(action: Action) => action.protocolNetwork,
)

join({ approvedActions }).pipe(async ({ approvedActions }) => {
logger.debug('Approved actions found, evaluating batch')
const approvedActionsByNetwork: NetworkMapped<Action[]> = groupBy(
approvedActions,
(action: Action) => action.protocolNetwork,
)
await this.multiNetworks.mapNetworkMapped(
approvedActionsByNetwork,
async (network: Network, approvedActions: Action[]) => {
const networkLogger = logger.child({
protocolNetwork: network.specification.networkIdentifier,
indexer: network.specification.indexerOptions.address,
operator: network.transactionManager.wallet.address,
})

await this.multiNetworks.mapNetworkMapped(
approvedActionsByNetwork,
async (network: Network, approvedActions: Action[]) => {
const networkLogger = logger.child({
if (await this.batchReady(approvedActions, network, networkLogger)) {
const paused = await network.paused.value()
const isOperator = await network.isOperator.value()
networkLogger.debug('Batch ready, preparing to execute', {
paused,
isOperator,
protocolNetwork: network.specification.networkIdentifier,
indexer: network.specification.indexerOptions.address,
operator: network.transactionManager.wallet.address,
})
// Do nothing else if the network is paused
if (paused) {
networkLogger.info(
`The network is currently paused, not doing anything until it resumes`,
)
return
}

if (await this.batchReady(approvedActions, network, networkLogger)) {
const paused = await network.paused.value()
const isOperator = await network.isOperator.value()
networkLogger.debug('Batch ready, preparing to execute', {
paused,
isOperator,
protocolNetwork: network.specification.networkIdentifier,
})
// Do nothing else if the network is paused
if (paused) {
networkLogger.info(
`The network is currently paused, not doing anything until it resumes`,
)
return
}

// Do nothing if we're not authorized as an operator for the indexer
if (!isOperator) {
networkLogger.error(`Not authorized as an operator for the indexer`, {
err: indexerError(IndexerErrorCode.IE034),
})
return
}

networkLogger.info('Executing batch of approved actions', {
actions: approvedActions,
note: 'If actions were approved very recently they may be missing from this batch',
// Do nothing if we're not authorized as an operator for the indexer
if (!isOperator) {
networkLogger.error(`Not authorized as an operator for the indexer`, {
err: indexerError(IndexerErrorCode.IE034),
})
return
}

try {
const attemptedActions = await this.executeApprovedActions(network)
networkLogger.trace('Attempted to execute all approved actions', {
actions: attemptedActions,
})
} catch (error) {
networkLogger.error('Failed to execute batch of approved actions', {
error,
})
}
networkLogger.info('Executing batch of approved actions', {
actions: approvedActions,
note: 'If actions were approved very recently they may be missing from this batch',
})

try {
const attemptedActions = await this.executeApprovedActions(network)
networkLogger.trace('Attempted to execute all approved actions', {
actions: attemptedActions,
})
} catch (error) {
networkLogger.error('Failed to execute batch of approved actions', {
error,
})
}
},
)
})
}
},
)
}

private async updateActionStatuses(
Expand All @@ -228,19 +218,32 @@ export class ActionManager {
return updatedActions
}

// a promise guard to ensure that only one batch of actions is executed at a time
async executeApprovedActions(network: Network): Promise<Action[]> {
if (this.executeBatchActionsPromise) {
this.logger.warn('Previous batch action execution is still in progress')
return this.executeBatchActionsPromise
}
this.executeBatchActionsPromise = this.executeApprovedActions(network)
const updatedActions = await this.executeBatchActionsPromise
this.executeBatchActionsPromise = undefined
return updatedActions
}

async executeApprovedActionsInner(network: Network): Promise<Action[]> {
let updatedActions: Action[] = []
const protocolNetwork = network.specification.networkIdentifier
const logger = this.logger.child({
function: 'executeApprovedActions',
protocolNetwork,
})

logger.trace('Begin database transaction for executing approved actions')
logger.debug('Begin database transaction for executing approved actions')
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await this.models.Action.sequelize!.transaction(
{ isolationLevel: Transaction.ISOLATION_LEVELS.SERIALIZABLE },
async (transaction) => {
const transactionOpenTime = Date.now()
let approvedActions
try {
// Execute already approved actions in the order of type and priority.
Expand Down Expand Up @@ -276,22 +279,33 @@ export class ActionManager {
return []
}
try {
logger.debug('Executing batch action', {
approvedActions,
startTimeMs: Date.now() - transactionOpenTime,
})

// This will return all results if successful, if failed it will return the failed actions
const allocationManager =
this.allocationManagers[network.specification.networkIdentifier]
const results = await allocationManager.executeBatch(approvedActions)

logger.debug('Completed batch action execution', {
results,
endTimeMs: Date.now() - transactionOpenTime,
})
updatedActions = await this.updateActionStatuses(results, transaction)

logger.debug('Updated action statuses', {
updatedActions,
updatedTimeMs: Date.now() - transactionOpenTime,
})
} catch (error) {
logger.error(`Failed to execute batch tx on staking contract: ${error}`)
throw indexerError(IndexerErrorCode.IE072, error)
}
},
)
logger.trace('End database transaction for executing approved actions')
logger.debug('End database transaction for executing approved actions')
return updatedActions
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export class AllocationManager {
return await this.confirmTransactions(result, actions)
}

async executeTransactions(actions: Action[]): Promise<TransactionResult> {
private async executeTransactions(actions: Action[]): Promise<TransactionResult> {
const logger = this.logger.child({ function: 'executeTransactions' })
logger.trace('Begin executing transactions', { actions })
if (actions.length < 1) {
Expand Down
38 changes: 15 additions & 23 deletions packages/indexer-common/src/sequential-timer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ export interface TimerTaskContext {
function logWorkTime(
workStarted: number,
logger: Logger,
loopTime: number,
caller: string | undefined,
milliseconds: number,
) {
const workTimeWarningThreshold = 5000
const workTime = Date.now() - workStarted
if (workTime > milliseconds + workTimeWarningThreshold) {
logger.warn(
'timer work took longer than the sequential timer was configured for (>5s)',
`timer work took ${
(workTime - milliseconds) / 1000
}s longer than expected, next execution in ${milliseconds / 1000}s`,
{
loopTime,
workTime,
milliseconds,
caller,
Expand All @@ -57,7 +57,6 @@ export function sequentialTimerReduce<T, U>(
// obtain the calling method name from the call stack
const stack = new Error().stack
const caller = stack?.split('\n')[2].trim()
let lastWorkStarted = Date.now()

let acc: U = initial
let previousT: T | undefined
Expand All @@ -74,26 +73,23 @@ export function sequentialTimerReduce<T, U>(
function work() {
const workStarted = Date.now()
const promiseOrT = reducer(acc, workStarted)
const loopTime = workStarted - lastWorkStarted

lastWorkStarted = workStarted
if (isPromiseLike(promiseOrT)) {
promiseOrT.then(
function onfulfilled(value) {
outputReduce(value)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
},
function onrejected(err) {
console.error(err)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
},
)
} else {
outputReduce(promiseOrT)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
}
}
// initial call
Expand All @@ -118,8 +114,6 @@ export function sequentialTimerMap<U>(
// obtain the calling method name from the call stack
const stack = new Error().stack
const caller = stack?.split('\n')[2].trim()
let lastWorkStarted = Date.now()

const output = mutable<U>()

let latestU: U | undefined
Expand All @@ -135,27 +129,25 @@ export function sequentialTimerMap<U>(
function work() {
const workStarted = Date.now()
const promiseOrU = mapper(workStarted)
const loopTime = workStarted - lastWorkStarted
lastWorkStarted = workStarted

if (isPromiseLike(promiseOrU)) {
promiseOrU.then(
function onfulfilled(value) {
checkMappedValue(value)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
},
function onrejected(err) {
options?.onError(err)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
},
)
} else {
// resolved value
checkMappedValue(promiseOrU)
logWorkTime(workStarted, logger, loopTime, caller, milliseconds)
setTimeout(work, Math.max(0, milliseconds - (Date.now() - workStarted)))
logWorkTime(workStarted, logger, caller, milliseconds)
setTimeout(work, milliseconds)
}
}

Expand Down

0 comments on commit db4bbd4

Please sign in to comment.