Skip to content

Commit

Permalink
common: skip querying active deployments if in MANUAL or poiDisputeMo…
Browse files Browse the repository at this point in the history
…nitoring is disabled
  • Loading branch information
dwerner committed Jan 8, 2025
1 parent d393c2d commit f17b402
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 95 deletions.
28 changes: 24 additions & 4 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,32 @@ export class Agent {
},
)

// Skip fetching active deployments if the deployment management mode is manual and POI tracking is disabled
const activeDeployments: Eventual<SubgraphDeploymentID[]> =
sequentialTimerMap(
{ logger, milliseconds: requestIntervalSmall },
() => {
logger.trace('Fetching active deployments')
return this.graphNode.subgraphDeployments()
{ logger, milliseconds: requestIntervalLarge },
async () => {
const deployments = await this.multiNetworks.map(
async ({ network }) => {
if (
this.deploymentManagement === DeploymentManagementMode.AUTO ||
network.networkMonitor.poiDisputeMonitoringEnabled()
) {
logger.trace('Fetching active deployments')
const assignments =
await this.graphNode.subgraphDeploymentsAssignments(
SubgraphStatus.ACTIVE,
)
return assignments.map(assignment => assignment.id)
} else {
logger.info(
"Skipping fetching active deployments fetch since DeploymentManagementMode = 'manual' and POI tracking is disabled",
)
return []
}
},
)
return deployments.values
},
{
onError: error =>
Expand Down
207 changes: 118 additions & 89 deletions packages/indexer-common/src/graph-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import { BlockPointer, ChainIndexingStatus, IndexingStatus } from './types'
import pRetry, { Options } from 'p-retry'
import axios, { AxiosInstance } from 'axios'
import fetch from 'isomorphic-fetch'
import { Action } from './indexer-management'

interface indexNode {
id: string
Expand Down Expand Up @@ -103,15 +102,11 @@ export class GraphNode {
this.logger.info(`Check if indexing status API is available`)
await pRetry(
async () => {
const deployments = await this.subgraphDeployments()
if (deployments.length < 100) {
this.logger.info(`Successfully connected to indexing status API`, {
currentDeployments: deployments.map((deployment) => deployment.display),
})
if (await this.statusEndpointConnected()) {
this.logger.info(`Successfully connected to indexing status API`, {})
} else {
this.logger.info(`Successfully connected to indexing status API`, {
currentDeploymentCount: deployments.length,
})
this.logger.error(`Failed to connect to indexing status API`)
throw new Error('Indexing status API not available')
}
},
{
Expand Down Expand Up @@ -149,97 +144,120 @@ export class GraphNode {
return new URL(deploymentIpfsHash, this.queryBaseURL).toString()
}

public async subgraphDeployments(): Promise<SubgraphDeploymentID[]> {
return (await this.subgraphDeploymentsAssignments(SubgraphStatus.ACTIVE)).map(
(details) => details.id,
)
// Simple query to make sure the status endpoint is connected
public async statusEndpointConnected(): Promise<boolean> {
try {
const result = await this.status
.query(
gql`
query {
__typename
}
`,
undefined,
)
.toPromise()

if (result.error) {
throw result.error
}

return !!result.data
} catch (error) {
this.logger.error(`Failed to query status endpoint`, { error })
return false
}
}

public async subgraphDeploymentAssignmentsForAllocateActions(
public async subgraphDeploymentAssignmentsByDeploymentID(
subgraphStatus: SubgraphStatus,
actions: Action[],
deploymentIDs: string[],
): Promise<SubgraphDeploymentAssignment[]> {
const deploymentIDs = actions.map((action) => action.deploymentID)

const nodeOnlyResult = await this.status
.query(
gql`
query indexingStatuses($subgraphs: [String!]!) {
indexingStatuses(subgraphs: $subgraphs) {
subgraphDeployment: subgraph
node
try {
const nodeOnlyResult = await this.status
.query(
gql`
query indexingStatuses($subgraphs: [String!]!) {
indexingStatuses(subgraphs: $subgraphs) {
subgraphDeployment: subgraph
node
}
}
}
`,
{ subgraphs: deploymentIDs },
)
.toPromise()
`,
{ subgraphs: deploymentIDs },
)
.toPromise()

if (nodeOnlyResult.error) {
throw nodeOnlyResult.error
}
if (nodeOnlyResult.error) {
throw nodeOnlyResult.error
}

const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses
.filter(
(result: { node: string | null }) =>
result.node !== null && result.node !== undefined,
)
.map((result: { subgraphDeployment: string }) => result.subgraphDeployment)

const result = await this.status
.query(
gql`
query indexingStatuses($subgraphs: [String!]!) {
indexingStatuses(subgraphs: $subgraphs) {
subgraphDeployment: subgraph
node
paused
const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses
.filter(
(result: { node: string | null }) =>
result.node !== null && result.node !== undefined,
)
.map((result: { subgraphDeployment: string }) => result.subgraphDeployment)

const result = await this.status
.query(
gql`
query indexingStatuses($subgraphs: [String!]!) {
indexingStatuses(subgraphs: $subgraphs) {
subgraphDeployment: subgraph
node
paused
}
}
}
`,
{ subgraphs: withAssignments },
)
.toPromise()
`,
{ subgraphs: withAssignments },
)
.toPromise()

if (result.error) {
throw result.error
}
if (result.error) {
throw result.error
}

if (!result.data.indexingStatuses || result.data.length === 0) {
this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, {
data: result.data,
})
return []
}
if (!result.data.indexingStatuses || result.data.length === 0) {
this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, {
data: result.data,
})
return []
}

type QueryResult = {
subgraphDeployment: string
node: string | undefined
paused: boolean | undefined
}
type QueryResult = {
subgraphDeployment: string
node: string | undefined
paused: boolean | undefined
}

const results = result.data.indexingStatuses
.filter((status: QueryResult) => {
if (subgraphStatus === SubgraphStatus.ACTIVE) {
return (
status.paused === false ||
(status.paused === undefined && status.node !== 'removed')
)
} else if (subgraphStatus === SubgraphStatus.PAUSED) {
return status.node === 'removed' || status.paused === true
} else if (subgraphStatus === SubgraphStatus.ALL) {
return true
}
})
.map((status: QueryResult) => {
return {
id: new SubgraphDeploymentID(status.subgraphDeployment),
node: status.node,
paused: status.paused ?? status.node === 'removed',
}
})
const results = result.data.indexingStatuses
.filter((status: QueryResult) => {
if (subgraphStatus === SubgraphStatus.ACTIVE) {
return (
status.paused === false ||
(status.paused === undefined && status.node !== 'removed')
)
} else if (subgraphStatus === SubgraphStatus.PAUSED) {
return status.node === 'removed' || status.paused === true
} else if (subgraphStatus === SubgraphStatus.ALL) {
return true
}
})
.map((status: QueryResult) => {
return {
id: new SubgraphDeploymentID(status.subgraphDeployment),
node: status.node,
paused: status.paused ?? status.node === 'removed',
}
})

return results
return results
} catch (error) {
const err = indexerError(IndexerErrorCode.IE018, error)
this.logger.error(`Failed to query indexing status API`, { err })
throw err
}
}

public async subgraphDeploymentsAssignments(
Expand All @@ -265,8 +283,11 @@ export class GraphNode {
)
.toPromise()

const deploymentCount = nodeOnlyResult.data?.indexingStatuses?.length ?? 0
this.logger.debug(
`Fetch subgraph deployment assignments took ${Date.now() - startTimeMs}ms`,
`Fetch subgraph deployment assignments (1/2, node only) took ${
Date.now() - startTimeMs
}ms for ${deploymentCount} deployments`,
)

if (nodeOnlyResult.error) {
Expand Down Expand Up @@ -313,6 +334,12 @@ export class GraphNode {
paused: boolean | undefined
}

const deploymentCount2 = result.data?.indexingStatuses?.length ?? 0
this.logger.debug(
`Fetch subgraph deployment assignments (2/2, with paused) took ${
Date.now() - startTimeMs
}ms and returned ${deploymentCount}/${deploymentCount2} deployments`,
)
const results = result.data.indexingStatuses
.filter((status: QueryResult) => {
if (subgraphStatus === SubgraphStatus.ACTIVE) {
Expand Down Expand Up @@ -557,7 +584,9 @@ export class GraphNode {
try {
const deploymentAssignments =
currentAssignments ??
(await this.subgraphDeploymentsAssignments(SubgraphStatus.ALL))
(await this.subgraphDeploymentAssignmentsByDeploymentID(SubgraphStatus.ALL, [
deployment.ipfsHash,
]))
const matchingAssignment = deploymentAssignments.find(
(deploymentAssignment) => deploymentAssignment.id.ipfsHash == deployment.ipfsHash,
)
Expand Down
4 changes: 2 additions & 2 deletions packages/indexer-common/src/indexer-management/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,9 @@ export class AllocationManager {
allocateActions,
})
const currentAssignments =
await this.graphNode.subgraphDeploymentAssignmentsForAllocateActions(
await this.graphNode.subgraphDeploymentAssignmentsByDeploymentID(
SubgraphStatus.ALL,
actions,
actions.map((action) => action.deploymentID!),
)
await pMap(
allocateActions,
Expand Down
4 changes: 4 additions & 0 deletions packages/indexer-common/src/indexer-management/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ export class NetworkMonitor {
private epochSubgraph: EpochSubgraph,
) {}

poiDisputeMonitoringEnabled(): boolean {
return this.indexerOptions.poiDisputeMonitoring
}

async currentEpochNumber(): Promise<number> {
return (await this.contracts.epochManager.currentEpoch()).toNumber()
}
Expand Down

0 comments on commit f17b402

Please sign in to comment.