Skip to content

Commit

Permalink
chore: improve queue handling
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed May 4, 2024
1 parent b337707 commit 0a79a3a
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 33 deletions.
1 change: 1 addition & 0 deletions packages/graphql/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ IS_DEV_TEST=
SYNC_OFFSET=10
SYNC_LIMIT=10000
QUEUE_CONCURRENCY=1000
WATCH_INTERVAL=5000
1 change: 1 addition & 0 deletions packages/graphql/.env.production
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ IS_DEV_TEST=
SYNC_OFFSET=10
SYNC_LIMIT=10000
QUEUE_CONCURRENCY=1000
WATCH_INTERVAL=5000
3 changes: 2 additions & 1 deletion packages/graphql/src/application/uc/AddBlockRange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ export class AddBlockRange {
const repo = new BlockRepository();
const { blocks } = await repo.blocksFromNode(to - from, from);
await repo.upsertMany(blocks);
await queue.push(QueueNames.SYNC_TRANSACTIONS, { blocks });
await queue.push(QueueNames.SYNC_TRANSACTIONS, { blocks }, { priority: 1 });
}
}

export const addBlockRange = async (input: QueueData<Input>) => {
try {
const { execute } = new AddBlockRange();
await execute(input);
await queue.complete(input.id);
} catch (error) {
console.error(error);
throw new Error('Sync transactions', {
Expand Down
103 changes: 75 additions & 28 deletions packages/graphql/src/application/uc/SyncBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ class Syncer {
private async syncBlocksRange({ from, to }: { from: number; to: number }) {
console.log(c.gray(`🔄 Syncing blocks from ${from} to ${to}`));
if (!env.get('IS_DEV_TEST')) {
await queue.push(QueueNames.ADD_BLOCK_RANGE, { from, to });
await queue.push(
QueueNames.ADD_BLOCK_RANGE,
{ from, to },
{ priority: 1 },
);
return { endCursor: to };
}
return { endCursor: to };
Expand All @@ -102,6 +106,8 @@ class Syncer {
}

const syncer = new Syncer();
const WATCH_INTERVAL = Number(env.get('WATCH_INTERVAL'));

const machine = setup({
types: {} as {
context: Context;
Expand All @@ -122,6 +128,9 @@ const machine = setup({
return syncer.syncMissingBlocks(context);
},
),
hasActiveJobs: fromPromise<boolean>(async () => {
return queue.hasActiveJobs();
}),
},
guards: {
hasMoreEvents: ({ context }) => {
Expand Down Expand Up @@ -181,55 +190,93 @@ const machine = setup({
checking: {
always: [
{ target: 'syncingBlocks', guard: 'hasMoreEvents' },
{ target: 'waiting', guard: 'needToWatch' },
{ target: 'waitingQueueJobs', guard: 'needToWatch' },
{ target: 'idle' },
],
},
waiting: {
after: {
1000: {
target: 'syncingMissingBlocks',
waitingQueueJobs: {
initial: 'fetching',
states: {
fetching: {
invoke: {
id: 'checkQueue',
src: 'hasActiveJobs',
onDone: [
{ target: 'waiting', guard: ({ event }) => event.output },
{ target: 'finished' },
],
},
},
waiting: {
after: {
60000: 'fetching',
},
},
finished: {
type: 'final',
},
},
onDone: {
target: 'syncingMissingBlocks',
},
},
syncingMissingBlocks: {
invoke: {
src: 'syncMissingBlocks',
input: ({ context }) => context,
onDone: {
target: 'resettingLastBlock',
actions: assign({
lastResult: ({ event }) => event.output,
cursor: ({ context, event }) => {
return event.output?.endCursor ?? context.cursor;
initial: 'syncing',
states: {
syncing: {
invoke: {
src: 'syncMissingBlocks',
input: ({ context }) => context,
onDone: {
target: 'resettingLastBlock',
actions: assign({
lastResult: ({ event }) => event.output,
cursor: ({ context, event }) => {
return event.output?.endCursor ?? context.cursor;
},
}),
},
}),
},
},
},
},
resettingLastBlock: {
invoke: {
src: 'getLatestBlock',
onDone: {
target: 'waiting',
actions: assign({
lastBlock: ({ event }) => event.output,
}),
waiting: {
after: {
[WATCH_INTERVAL]: {
target: 'syncing',
},
},
},
resettingLastBlock: {
invoke: {
src: 'getLatestBlock',
onDone: {
target: 'waiting',
actions: assign({
lastBlock: ({ event }) => event.output,
}),
},
},
},
},
},
},
});

export const syncBlocks = async ({ data }: QueueData<Input>) => {
export const syncBlocks = async ({ id, data }: QueueData<Input>) => {
try {
const actor = createActor(machine, { input: data });
actor.subscribe((state) => {
console.log(c.yellow(`📟 State: ${state.value}`));
const val = state.value;
if (typeof val === 'string') {
console.log(c.yellow(`📟 State: ${val}`));
return;
}
const [key, value] = Object.entries(val)[0];
console.log(c.yellow(`📟 State: ${key}.${value}`));
});

actor.start();
actor.send({ type: 'START_SYNC' });
await queue.complete(id);
} catch (error) {
console.error(error);
throw new Error('Sync block attempt failed!', {
Expand Down
3 changes: 2 additions & 1 deletion packages/graphql/src/application/uc/SyncLastBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ export class SyncLastBlocks {
}
}

export const syncLastBlocks = async ({ data }: QueueData<Props>) => {
export const syncLastBlocks = async ({ id, data }: QueueData<Props>) => {
try {
const syncLastBlocks = new SyncLastBlocks();
await syncLastBlocks.execute(data);
await queue.complete(id);
} catch (error) {
console.error(error);
throw new Error('Sync last', {
Expand Down
12 changes: 10 additions & 2 deletions packages/graphql/src/application/uc/SyncMissingBlocks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import { BlockRepository } from '~/domain/Block/BlockRepository';
import { QueueNames, queue } from '~/infra/queue/Queue';
import {
type QueueData,
type QueueInputs,
QueueNames,
queue,
} from '~/infra/queue/Queue';

type Props = QueueInputs[QueueNames.SYNC_MISSING];

export class SyncMissingBlocks {
async execute() {
Expand All @@ -13,11 +20,12 @@ export class SyncMissingBlocks {
}
}

export const syncMissingBlocks = async () => {
export const syncMissingBlocks = async ({ id }: QueueData<Props>) => {
try {
console.log('Syncing missing blocks');
const syncMissingBlocks = new SyncMissingBlocks();
await syncMissingBlocks.execute();
await queue.complete(id);
} catch (error) {
console.error(error);
throw new Error('Sync missing', {
Expand Down
8 changes: 7 additions & 1 deletion packages/graphql/src/application/uc/SyncTransactions.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import c from 'chalk';
import { uniqBy } from 'lodash';
import type { GQLBlock } from '~/graphql/generated/sdk';
import type { QueueData, QueueInputs, QueueNames } from '~/infra/queue/Queue';
import {
type QueueData,
type QueueInputs,
type QueueNames,
queue,
} from '~/infra/queue/Queue';
import { addTransactions } from './AddTransactions';

type Input = QueueInputs[QueueNames.SYNC_TRANSACTIONS];
Expand Down Expand Up @@ -37,6 +42,7 @@ export const syncTransactions = async (input: QueueData<Input>) => {
try {
const syncTransactions = new SyncTransactions();
await syncTransactions.execute(input);
await queue.complete(input.id);
} catch (error) {
console.error(error);
throw new Error('Sync transactions', {
Expand Down
2 changes: 2 additions & 0 deletions packages/graphql/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const schema = zod.object({
SYNC_OFFSET: zod.string().optional().default('10'),
SYNC_LIMIT: zod.string().optional().default('10000'),
QUEUE_CONCURRENCY: zod.string().optional().default('1000'),
WATCH_INTERVAL: zod.string().optional().default('5000'),
});

export const env = new Env(schema, {
Expand All @@ -33,4 +34,5 @@ export const env = new Env(schema, {
SYNC_OFFSET: '10',
SYNC_LIMIT: '10000',
QUEUE_CONCURRENCY: '1000',
WATCH_INTERVAL: '5000',
});
14 changes: 14 additions & 0 deletions packages/graphql/src/infra/queue/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import c from 'chalk';
import PgBoss, { type Job } from 'pg-boss';
import { addBlockRange } from '~/application/uc/AddBlockRange';
import { syncBlocks } from '~/application/uc/SyncBlocks';
Expand Down Expand Up @@ -93,6 +94,19 @@ export class Queue extends PgBoss {
await this.start();
console.log('⚡️ Queue running');
}

async hasActiveJobs() {
const batchSize = Number(env.get('QUEUE_CONCURRENCY'));
const results = await Promise.all([
queue.fetch(QueueNames.SYNC_BLOCKS, batchSize),
queue.fetch(QueueNames.ADD_BLOCK_RANGE, batchSize),
queue.fetch(QueueNames.SYNC_TRANSACTIONS, batchSize),
]);
console.log(c.gray(`⌛️ Active Blocks: ${results[0]?.length ?? 0}`));
console.log(c.gray(`⌛️ Active Range: ${results[1]?.length ?? 0}`));
console.log(c.gray(`⌛️ Active Transactions: ${results[2]?.length ?? 0}`));
return results.some((result) => Boolean(result?.length ?? 0 > 0));
}
}

export const queue = new Queue({
Expand Down

0 comments on commit 0a79a3a

Please sign in to comment.