Skip to content

Commit

Permalink
common: limit graph-node status query to actions' deploymentIDs
Browse files Browse the repository at this point in the history
  • Loading branch information
dwerner committed Dec 27, 2024
1 parent 27ab3e0 commit a8037f2
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 10 deletions.
113 changes: 109 additions & 4 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { BlockPointer, ChainIndexingStatus, IndexingStatus } from './types'
import pRetry, { Options } from 'p-retry'
import axios, { AxiosInstance } from 'axios'
import fetch from 'isomorphic-fetch'
import { Action } from './indexer-management'

interface indexNode {
id: string
Expand Down Expand Up @@ -103,9 +104,15 @@ export class GraphNode {
await pRetry(
async () => {
const deployments = await this.subgraphDeployments()
this.logger.info(`Successfully connected to indexing status API`, {
currentDeployments: deployments.map((deployment) => deployment.display),
})
if (deployments.length < 100) {
this.logger.info(`Successfully connected to indexing status API`, {
currentDeployments: deployments.map((deployment) => deployment.display),
})
} else {
this.logger.info(`Successfully connected to indexing status API`, {
currentDeploymentCount: deployments.length,
})
}
},
{
retries: 10,
Expand Down Expand Up @@ -148,10 +155,98 @@ export class GraphNode {
)
}

public async subgraphDeploymentAssignmentsForAllocateActions(
subgraphStatus: SubgraphStatus,
actions: Action[],
): Promise<SubgraphDeploymentAssignment[]> {
const deploymentIDs = actions.map((action) => action.deploymentID)

const nodeOnlyResult = await this.status
.query(
gql`
query indexingStatuses($subgraphs: [String!]!) {
indexingStatuses(subgraphs: $subgraphs) {
subgraphDeployment: subgraph
node
}
}
`,
{ subgraphs: deploymentIDs },
)
.toPromise()

if (nodeOnlyResult.error) {
throw nodeOnlyResult.error
}

const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses
.filter(
(result: { node: string | null }) =>
result.node !== null && result.node !== undefined,
)
.map((result: { subgraphDeployment: string }) => result.subgraphDeployment)

const result = await this.status
.query(
gql`
query indexingStatuses($subgraphs: [String!]!) {
indexingStatuses(subgraphs: $subgraphs) {
subgraphDeployment: subgraph
node
paused
}
}
`,
{ subgraphs: withAssignments },
)
.toPromise()

if (result.error) {
throw result.error
}

if (!result.data.indexingStatuses || result.data.length === 0) {
this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, {
data: result.data,
})
return []
}

type QueryResult = {
subgraphDeployment: string
node: string | undefined
paused: boolean | undefined
}

const results = result.data.indexingStatuses
.filter((status: QueryResult) => {
if (subgraphStatus === SubgraphStatus.ACTIVE) {
return (
status.paused === false ||
(status.paused === undefined && status.node !== 'removed')
)
} else if (subgraphStatus === SubgraphStatus.PAUSED) {
return status.node === 'removed' || status.paused === true
} else if (subgraphStatus === SubgraphStatus.ALL) {
return true
}
})
.map((status: QueryResult) => {
return {
id: new SubgraphDeploymentID(status.subgraphDeployment),
node: status.node,
paused: status.paused ?? status.node === 'removed',
}
})

return results
}

public async subgraphDeploymentsAssignments(
subgraphStatus: SubgraphStatus,
): Promise<SubgraphDeploymentAssignment[]> {
try {
const startTimeMs = Date.now()
this.logger.debug('Fetch subgraph deployment assignments')

// FIXME: remove this initial check for just node when graph-node releases
Expand All @@ -170,6 +265,10 @@ export class GraphNode {
)
.toPromise()

this.logger.debug(
`Fetch subgraph deployment assignments took ${Date.now() - startTimeMs}ms`,
)

if (nodeOnlyResult.error) {
throw nodeOnlyResult.error
}
Expand Down Expand Up @@ -214,7 +313,7 @@ export class GraphNode {
paused: boolean | undefined
}

return result.data.indexingStatuses
const results = result.data.indexingStatuses
.filter((status: QueryResult) => {
if (subgraphStatus === SubgraphStatus.ACTIVE) {
return (
Expand All @@ -234,6 +333,12 @@ export class GraphNode {
paused: status.paused ?? status.node === 'removed',
}
})
this.logger.debug(
`Fetching mapped subgraph deployment ${results.length} assignments took ${
Date.now() - startTimeMs
}ms`,
)
return results
} catch (error) {
const err = indexerError(IndexerErrorCode.IE018, error)
this.logger.error(`Failed to query indexing status API`, { err })
Expand Down
11 changes: 8 additions & 3 deletions packages/indexer-common/src/indexer-management/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,14 @@ export class ActionManager {
this.logger.warn('Previous batch action execution is still in progress')
return this.executeBatchActionsPromise
}
this.executeBatchActionsPromise = this.executeApprovedActionsInner(network)
const updatedActions = await this.executeBatchActionsPromise
this.executeBatchActionsPromise = undefined

let updatedActions: Action[] = []
try {
this.executeBatchActionsPromise = this.executeApprovedActionsInner(network)
updatedActions = await this.executeBatchActionsPromise
} finally {
this.executeBatchActionsPromise = undefined
}
return updatedActions
}

Expand Down
8 changes: 5 additions & 3 deletions packages/indexer-common/src/indexer-management/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,11 @@ export class AllocationManager {
logger.info('Ensure subgraph deployments are deployed before we allocate to them', {
allocateActions,
})
const currentAssignments = await this.graphNode.subgraphDeploymentsAssignments(
SubgraphStatus.ALL,
)
const currentAssignments =
await this.graphNode.subgraphDeploymentAssignmentsForAllocateActions(
SubgraphStatus.ALL,
actions,
)
await pMap(
allocateActions,
async (action: Action) =>
Expand Down

0 comments on commit a8037f2

Please sign in to comment.