diff --git a/packages/graphql/.env.example b/packages/graphql/.env.example index d74aa3005..138890fc2 100644 --- a/packages/graphql/.env.example +++ b/packages/graphql/.env.example @@ -12,3 +12,4 @@ IS_DEV_TEST= SYNC_OFFSET=10 SYNC_LIMIT=10000 QUEUE_CONCURRENCY=1000 +WATCH_INTERVAL=5000 diff --git a/packages/graphql/.env.production b/packages/graphql/.env.production index ea1648112..71f57d560 100644 --- a/packages/graphql/.env.production +++ b/packages/graphql/.env.production @@ -12,3 +12,4 @@ IS_DEV_TEST= SYNC_OFFSET=10 SYNC_LIMIT=10000 QUEUE_CONCURRENCY=1000 +WATCH_INTERVAL=5000 diff --git a/packages/graphql/src/application/uc/AddBlockRange.ts b/packages/graphql/src/application/uc/AddBlockRange.ts index 8c862b82f..e00c2c0ed 100644 --- a/packages/graphql/src/application/uc/AddBlockRange.ts +++ b/packages/graphql/src/application/uc/AddBlockRange.ts @@ -14,7 +14,7 @@ export class AddBlockRange { const repo = new BlockRepository(); const { blocks } = await repo.blocksFromNode(to - from, from); await repo.upsertMany(blocks); - await queue.push(QueueNames.SYNC_TRANSACTIONS, { blocks }); + await queue.push(QueueNames.SYNC_TRANSACTIONS, { blocks }, { priority: 1 }); } } @@ -22,6 +22,7 @@ export const addBlockRange = async (input: QueueData) => { try { const { execute } = new AddBlockRange(); await execute(input); + await queue.complete(input.id); } catch (error) { console.error(error); throw new Error('Sync transactions', { diff --git a/packages/graphql/src/application/uc/SyncBlocks.ts b/packages/graphql/src/application/uc/SyncBlocks.ts index 9759e0c75..2efed2f55 100644 --- a/packages/graphql/src/application/uc/SyncBlocks.ts +++ b/packages/graphql/src/application/uc/SyncBlocks.ts @@ -84,7 +84,11 @@ class Syncer { private async syncBlocksRange({ from, to }: { from: number; to: number }) { console.log(c.gray(`🔄 Syncing blocks from ${from} to ${to}`)); if (!env.get('IS_DEV_TEST')) { - await queue.push(QueueNames.ADD_BLOCK_RANGE, { from, to }); + await queue.push( + QueueNames.ADD_BLOCK_RANGE, + { from, to }, + { priority: 1 }, + ); return { endCursor: to }; } return { endCursor: to }; @@ -102,6 +106,8 @@ class Syncer { } const syncer = new Syncer(); +const WATCH_INTERVAL = Number(env.get('WATCH_INTERVAL')); + const machine = setup({ types: {} as { context: Context; @@ -122,6 +128,9 @@ const machine = setup({ return syncer.syncMissingBlocks(context); }, ), + hasActiveJobs: fromPromise(async () => { + return queue.hasActiveJobs(); + }), }, guards: { hasMoreEvents: ({ context }) => { @@ -181,55 +190,93 @@ const machine = setup({ checking: { always: [ { target: 'syncingBlocks', guard: 'hasMoreEvents' }, - { target: 'waiting', guard: 'needToWatch' }, + { target: 'waitingQueueJobs', guard: 'needToWatch' }, { target: 'idle' }, ], }, - waiting: { - after: { - 1000: { - target: 'syncingMissingBlocks', + waitingQueueJobs: { + initial: 'fetching', + states: { + fetching: { + invoke: { + id: 'checkQueue', + src: 'hasActiveJobs', + onDone: [ + { target: 'waiting', guard: ({ event }) => event.output }, + { target: 'finished' }, + ], + }, + }, + waiting: { + after: { + 60000: 'fetching', + }, + }, + finished: { + type: 'final', }, }, + onDone: { + target: 'syncingMissingBlocks', + }, }, syncingMissingBlocks: { - invoke: { - src: 'syncMissingBlocks', - input: ({ context }) => context, - onDone: { - target: 'resettingLastBlock', - actions: assign({ - lastResult: ({ event }) => event.output, - cursor: ({ context, event }) => { - return event.output?.endCursor ?? context.cursor; + initial: 'syncing', + states: { + syncing: { + invoke: { + src: 'syncMissingBlocks', + input: ({ context }) => context, + onDone: { + target: 'resettingLastBlock', + actions: assign({ + lastResult: ({ event }) => event.output, + cursor: ({ context, event }) => { + return event.output?.endCursor ?? context.cursor; + }, + }), }, - }), + }, }, - }, - }, - resettingLastBlock: { - invoke: { - src: 'getLatestBlock', - onDone: { - target: 'waiting', - actions: assign({ - lastBlock: ({ event }) => event.output, - }), + waiting: { + after: { + [WATCH_INTERVAL]: { + target: 'syncing', + }, + }, + }, + resettingLastBlock: { + invoke: { + src: 'getLatestBlock', + onDone: { + target: 'waiting', + actions: assign({ + lastBlock: ({ event }) => event.output, + }), + }, + }, }, }, }, }, }); -export const syncBlocks = async ({ data }: QueueData) => { +export const syncBlocks = async ({ id, data }: QueueData) => { try { const actor = createActor(machine, { input: data }); actor.subscribe((state) => { - console.log(c.yellow(`📟 State: ${state.value}`)); + const val = state.value; + if (typeof val === 'string') { + console.log(c.yellow(`📟 State: ${val}`)); + return; + } + const [key, value] = Object.entries(val)[0]; + console.log(c.yellow(`📟 State: ${key}.${value}`)); }); actor.start(); actor.send({ type: 'START_SYNC' }); + await queue.complete(id); } catch (error) { console.error(error); throw new Error('Sync block attempt failed!', { diff --git a/packages/graphql/src/application/uc/SyncLastBlocks.ts b/packages/graphql/src/application/uc/SyncLastBlocks.ts index d8fa10784..e33dfafdd 100644 --- a/packages/graphql/src/application/uc/SyncLastBlocks.ts +++ b/packages/graphql/src/application/uc/SyncLastBlocks.ts @@ -20,10 +20,11 @@ export class SyncLastBlocks { } } -export const syncLastBlocks = async ({ data }: QueueData) => { +export const syncLastBlocks = async ({ id, data }: QueueData) => { try { const syncLastBlocks = new SyncLastBlocks(); await syncLastBlocks.execute(data); + await queue.complete(id); } catch (error) { console.error(error); throw new Error('Sync last', { diff --git a/packages/graphql/src/application/uc/SyncMissingBlocks.ts b/packages/graphql/src/application/uc/SyncMissingBlocks.ts index 36f909b96..568d0538f 100644 --- a/packages/graphql/src/application/uc/SyncMissingBlocks.ts +++ b/packages/graphql/src/application/uc/SyncMissingBlocks.ts @@ -1,5 +1,12 @@ import { BlockRepository } from '~/domain/Block/BlockRepository'; -import { QueueNames, queue } from '~/infra/queue/Queue'; +import { + type QueueData, + type QueueInputs, + QueueNames, + queue, +} from '~/infra/queue/Queue'; + +type Props = QueueInputs[QueueNames.SYNC_MISSING]; export class SyncMissingBlocks { async execute() { @@ -13,11 +20,12 @@ export class SyncMissingBlocks { } } -export const syncMissingBlocks = async () => { +export const syncMissingBlocks = async ({ id }: QueueData) => { try { console.log('Syncing missing blocks'); const syncMissingBlocks = new SyncMissingBlocks(); await syncMissingBlocks.execute(); + await queue.complete(id); } catch (error) { console.error(error); throw new Error('Sync missing', { diff --git a/packages/graphql/src/application/uc/SyncTransactions.ts b/packages/graphql/src/application/uc/SyncTransactions.ts index 5addc388f..0a191c59f 100644 --- a/packages/graphql/src/application/uc/SyncTransactions.ts +++ b/packages/graphql/src/application/uc/SyncTransactions.ts @@ -1,7 +1,12 @@ import c from 'chalk'; import { uniqBy } from 'lodash'; import type { GQLBlock } from '~/graphql/generated/sdk'; -import type { QueueData, QueueInputs, QueueNames } from '~/infra/queue/Queue'; +import { + type QueueData, + type QueueInputs, + type QueueNames, + queue, +} from '~/infra/queue/Queue'; import { addTransactions } from './AddTransactions'; type Input = QueueInputs[QueueNames.SYNC_TRANSACTIONS]; @@ -37,6 +42,7 @@ export const syncTransactions = async (input: QueueData) => { try { const syncTransactions = new SyncTransactions(); await syncTransactions.execute(input); + await queue.complete(input.id); } catch (error) { console.error(error); throw new Error('Sync transactions', { diff --git a/packages/graphql/src/config.ts b/packages/graphql/src/config.ts index b3d2f6a42..57e37377e 100644 --- a/packages/graphql/src/config.ts +++ b/packages/graphql/src/config.ts @@ -17,6 +17,7 @@ const schema = zod.object({ SYNC_OFFSET: zod.string().optional().default('10'), SYNC_LIMIT: zod.string().optional().default('10000'), QUEUE_CONCURRENCY: zod.string().optional().default('1000'), + WATCH_INTERVAL: zod.string().optional().default('5000'), }); export const env = new Env(schema, { @@ -33,4 +34,5 @@ export const env = new Env(schema, { SYNC_OFFSET: '10', SYNC_LIMIT: '10000', QUEUE_CONCURRENCY: '1000', + WATCH_INTERVAL: '5000', }); diff --git a/packages/graphql/src/infra/queue/Queue.ts b/packages/graphql/src/infra/queue/Queue.ts index c9257d0d7..451ca6f0f 100644 --- a/packages/graphql/src/infra/queue/Queue.ts +++ b/packages/graphql/src/infra/queue/Queue.ts @@ -1,3 +1,4 @@ +import c from 'chalk'; import PgBoss, { type Job } from 'pg-boss'; import { addBlockRange } from '~/application/uc/AddBlockRange'; import { syncBlocks } from '~/application/uc/SyncBlocks'; @@ -93,6 +94,19 @@ export class Queue extends PgBoss { await this.start(); console.log('⚡️ Queue running'); } + + async hasActiveJobs() { + const batchSize = Number(env.get('QUEUE_CONCURRENCY')); + const results = await Promise.all([ + queue.fetch(QueueNames.SYNC_BLOCKS, batchSize), + queue.fetch(QueueNames.ADD_BLOCK_RANGE, batchSize), + queue.fetch(QueueNames.SYNC_TRANSACTIONS, batchSize), + ]); + console.log(c.gray(`⌛️ Active Blocks: ${results[0]?.length ?? 0}`)); + console.log(c.gray(`⌛️ Active Range: ${results[1]?.length ?? 0}`)); + console.log(c.gray(`⌛️ Active Transactions: ${results[2]?.length ?? 0}`)); + return results.some((result) => Boolean(result?.length ?? 0 > 0)); + } } export const queue = new Queue({