Skip to content

Commit

Permalink
chore: add rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed May 11, 2024
1 parent 49dc59a commit 9a6b514
Show file tree
Hide file tree
Showing 14 changed files with 1,086 additions and 138 deletions.
8 changes: 7 additions & 1 deletion packages/graphql/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@ DB_PORT="5435"
DB_USER="postgres"
DB_PASS="postgres"
DB_NAME="postgres"
RABBITMQ_HOST="127.0.0.1"
RABBITMQ_USER="guest"
RABBITMQ_PASS="guest"
RABBITMQ_QUEUE="notification"

SYNC_MISSING=
SERVER_BUILD=
IS_DEV_TEST=
SYNC_OFFSET=10
SYNC_LIMIT=10000
QUEUE_CONCURRENCY=1000
QUEUE_CONCURRENCY=40
WATCH_INTERVAL=5000


4 changes: 4 additions & 0 deletions packages/graphql/.env.production
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ DB_PORT="5435"
DB_USER="postgres"
DB_PASS="postgres"
DB_NAME="postgres"
RABBITMQ_HOST="127.0.0.1"
RABBITMQ_USER="guest"
RABBITMQ_PASS="guest"
RABBITMQ_QUEUE="notification"

SYNC_MISSING=
SERVER_BUILD=
Expand Down
3 changes: 3 additions & 0 deletions packages/graphql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"@graphql-tools/utils": "^10.1.3",
"@types/cors": "^2.8.17",
"@types/express": "^4.17.21",
"amqplib": "0.10.4",
"cors": "^2.8.5",
"dayjs": "1.11.10",
"dotenv": "16.4.5",
Expand All @@ -69,6 +70,7 @@
"fuels": "0.74.0",
"graphql": "^16.8.1",
"graphql-yoga": "5.3.0",
"kafkajs": "2.2.4",
"lodash": "^4.17.21",
"node-fetch": "3.3.2",
"npm-run-all": "^4.1.5",
Expand All @@ -90,6 +92,7 @@
"@graphql-codegen/typescript": "^4.0.6",
"@graphql-codegen/typescript-graphql-request": "^6.2.0",
"@graphql-codegen/typescript-operations": "^4.2.0",
"@types/amqplib": "0.10.5",
"@types/lodash": "4.17.0",
"@types/node": "^20.12.7",
"@types/pg": "^8.11.5",
Expand Down
16 changes: 5 additions & 11 deletions packages/graphql/src/application/uc/AddBlockRange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@ import { performance } from 'node:perf_hooks';
import c from 'chalk';
import { BlockRepository } from '~/domain/Block/BlockRepository';
import { db } from '~/infra/database/Db';
import {
type QueueData,
type QueueInputs,
type QueueNames,
queue,
} from '~/infra/queue/Queue';
import type { QueueInputs, QueueNames } from '~/infra/queue/Queue';
import { addTransactions } from './AddTransactions';

type Input = QueueInputs[QueueNames.ADD_BLOCK_RANGE];
type Data = QueueInputs[QueueNames.ADD_BLOCK_RANGE];

export class AddBlockRange {
async execute({ id, data }: QueueData<Input>) {
async execute(data: Data) {
const { blocks } = data;
const from = blocks[0].header.height;
const to = blocks[blocks.length - 1].header.height;
Expand All @@ -27,14 +22,13 @@ export class AddBlockRange {
const end = performance.now();
const secs = Number.parseInt(`${(end - start) / 1000}`);
console.log(c.green(`✅ Synced blocks: #${from} - #${to} (${secs}s)`));
await queue.complete(id);
}
}

export const addBlockRange = async (input: QueueData<Input>) => {
export const addBlockRange = async (data: Data) => {
try {
const { execute } = new AddBlockRange();
await execute(input);
await execute(data);
} catch (error) {
console.error(error);
throw new Error('Sync transactions', {
Expand Down
5 changes: 2 additions & 3 deletions packages/graphql/src/application/uc/SyncBlocks.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type { Input } from 'fuels';
import type { QueueData, QueueInputs, QueueNames } from '~/infra/queue/Queue';
import type { QueueInputs, QueueNames } from '~/infra/queue/Queue';
import { worker } from '~/infra/worker/Worker';

export type SyncBlocksProps = QueueInputs[QueueNames.SYNC_BLOCKS];

export const syncBlocks = async ({ data }: QueueData<Input>) => {
export const syncBlocks = async (data: SyncBlocksProps) => {
worker.run(data);
};
16 changes: 5 additions & 11 deletions packages/graphql/src/application/uc/SyncLastBlocks.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,24 @@
import { BlockRepository } from '~/domain/Block/BlockRepository';
import {
type QueueData,
type QueueInputs,
QueueNames,
queue,
} from '~/infra/queue/Queue';
import { type QueueInputs, QueueNames, mq } from '~/infra/queue/Queue';

type Props = QueueInputs[QueueNames.SYNC_LAST];
type Data = QueueInputs[QueueNames.SYNC_LAST];

export class SyncLastBlocks {
async execute({ last, watch }: Props) {
async execute({ last, watch }: Data) {
const repo = new BlockRepository();
const lastBlock = await repo.latestBlockFromNode();
const blockHeight = Number(lastBlock?.header.height ?? '0');
const from = blockHeight - last;

console.log(`Syncing last ${last} blocks from ${from} to ${blockHeight}`);
await queue.push(QueueNames.SYNC_BLOCKS, { watch, cursor: from });
await mq.send(QueueNames.SYNC_BLOCKS, { watch, cursor: from });
}
}

export const syncLastBlocks = async ({ id, data }: QueueData<Props>) => {
export const syncLastBlocks = async (data: Data) => {
try {
const syncLastBlocks = new SyncLastBlocks();
await syncLastBlocks.execute(data);
await queue.complete(id);
} catch (error) {
console.error(error);
throw new Error('Sync last', {
Expand Down
14 changes: 4 additions & 10 deletions packages/graphql/src/application/uc/SyncMissingBlocks.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
import { BlockRepository } from '~/domain/Block/BlockRepository';
import {
type QueueData,
type QueueInputs,
QueueNames,
queue,
} from '~/infra/queue/Queue';
import { type QueueInputs, QueueNames, mq } from '~/infra/queue/Queue';

type Props = QueueInputs[QueueNames.SYNC_MISSING];
type Data = QueueInputs[QueueNames.SYNC_MISSING];

export class SyncMissingBlocks {
async execute() {
const repo = new BlockRepository();
const latest = await repo.findLatestAdded();
const cursor = latest ? Number(latest.data.header.height) : undefined;
console.log('Syncing missing blocks from', cursor);
await queue.push(QueueNames.SYNC_BLOCKS, { cursor, watch: true });
await mq.send(QueueNames.SYNC_BLOCKS, { cursor, watch: true });
}
}

export const syncMissingBlocks = async ({ id }: QueueData<Props>) => {
export const syncMissingBlocks = async (_: Data) => {
try {
console.log('Syncing missing blocks');
const syncMissingBlocks = new SyncMissingBlocks();
await syncMissingBlocks.execute();
await queue.complete(id);
} catch (error) {
console.error(error);
throw new Error('Sync missing', {
Expand Down
6 changes: 6 additions & 0 deletions packages/graphql/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ const schema = zod.object({
DB_USER: zod.string(),
DB_PASS: zod.string(),
DB_NAME: zod.string(),
RABBITMQ_HOST: zod.string().default('127.0.0.1'),
RABBITMQ_USER: zod.string().default('guest'),
RABBITMQ_PASS: zod.string().default('guest'),
SYNC_MISSING: falsy.optional(),
SERVER_BUILD: falsy.optional(),
IS_DEV_TEST: falsy.optional(),
Expand All @@ -28,6 +31,9 @@ export const env = new Env(schema, {
DB_USER: 'postgres',
DB_PASS: 'postgres',
DB_NAME: 'postgres',
RABBITMQ_HOST: 'localhost',
RABBITMQ_USER: 'guest',
RABBITMQ_PASS: 'guest',
SERVER_BUILD: false,
SYNC_MISSING: false,
IS_DEV_TEST: false,
Expand Down
155 changes: 90 additions & 65 deletions packages/graphql/src/infra/queue/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
import PgBoss, { type Job } from 'pg-boss';
import client, {
type Channel,
type Connection,
type ConsumeMessage,
} from 'amqplib';
import { addBlockRange } from '~/application/uc/AddBlockRange';
import { syncBlocks } from '~/application/uc/SyncBlocks';
import { syncLastBlocks } from '~/application/uc/SyncLastBlocks';
import { syncMissingBlocks } from '~/application/uc/SyncMissingBlocks';
import { env } from '~/config';
import type { GQLBlock } from '~/graphql/generated/sdk';

const DB_HOST = env.get('DB_HOST');
const DB_PORT = env.get('DB_PORT');
const DB_USER = env.get('DB_USER');
const DB_PASS = env.get('DB_PASS');
const DB_NAME = env.get('DB_NAME');
const HOST = env.get('RABBITMQ_HOST');
const USER = env.get('RABBITMQ_USER');
const PASS = env.get('RABBITMQ_PASS');
const MAX_WORKERS = Number(env.get('QUEUE_CONCURRENCY'));

type Payload<D = unknown> = {
type: QueueNames;
data: D;
};

export enum QueueNames {
SYNC_BLOCKS = 'indexer/sync-blocks',
Expand All @@ -37,80 +45,97 @@ export type QueueInputs = {
};
};

export type QueueData<T = unknown> = Job<T>;
class RabbitMQConnection {
connection!: Connection;
channel!: Channel;
private connected!: Boolean;

export class Queue extends PgBoss {
private workOpts: PgBoss.WorkOptions = {
teamSize: Number(env.get('QUEUE_CONCURRENCY')),
teamConcurrency: Number(env.get('QUEUE_CONCURRENCY')),
teamRefill: true,
};
async connect() {
if (this.connected && this.channel) return;
this.connected = true;

static defaultJobOptions = {
retryLimit: 10,
retryDelay: 1,
retryBackoff: false,
expireInSeconds: 120,
};
try {
console.log('⌛️ Connecting to Rabbit-MQ Server');
const url = `amqp://${USER}:${PASS}@${HOST}:5672`;
this.connection = await client.connect(url);
console.log('✅ Rabbit MQ Connection is ready');
this.channel = await this.connection.createChannel();
await this.channel.prefetch(MAX_WORKERS);
console.log('🛸 Created RabbitMQ Channel successfully');
} catch (error) {
console.error(error);
console.error('Not connected to MQ Server');
}
}

push<Q extends QueueNames>(
queue: Q,
data?: QueueInputs[Q] | null,
options?: PgBoss.JobOptions,
) {
// console.log(`Pushing job to queue ${queue}`);
return this.send(queue, data as object, {
...Queue.defaultJobOptions,
...options,
});
async disconnect() {
await this.channel.close();
await this.connection.close();
}

async clean() {
const names = Object.values(QueueNames);
for (const name of names) {
await this.channel.deleteQueue(name);
}
await this.channel.deleteExchange('blocks');
console.log('🧹 Cleaned all queues');
}

pushSingleton<Q extends QueueNames>(
async send<Q extends QueueNames, P extends Payload<QueueInputs[Q]>>(
queue: Q,
data?: QueueInputs[Q] | null,
options?: PgBoss.JobOptions,
data?: P['data'],
) {
// console.log(`Pushing job to queue ${queue}`);
return this.sendSingleton(queue, data as object, {
...Queue.defaultJobOptions,
...options,
});
try {
if (!this.channel) {
await this.connect();
}
const payload = { type: queue, data } as P;
const buffer = Buffer.from(JSON.stringify(payload));
this.channel.sendToQueue(queue, buffer, { persistent: true });
} catch (error) {
console.error(error);
throw error;
}
}

pushBatch<Q extends QueueNames>(
async consume<Q extends QueueNames, P extends Payload<QueueInputs[Q]>>(
queue: Q,
data: Array<QueueInputs[Q]>,
opts?: Partial<PgBoss.JobInsert<object>>,
handler: (data: P['data']) => void,
) {
const jobs: Array<PgBoss.JobInsert<object>> = data.map((job) => ({
...Queue.defaultJobOptions,
...opts,
name: queue,
data: job ?? {},
}));
return this.insert(jobs);
await this.channel.assertQueue(queue, { durable: true });
await this.channel.consume(
queue,
(msg) => {
if (!msg) return;
const payload = this.parsePayload<P>(msg);
if (payload?.type === queue) {
handler(payload.data);
this.channel.ack(msg);
}
},
{ noAck: false },
);
}

async setup() {
await this.connect();
await this.consume(QueueNames.ADD_BLOCK_RANGE, addBlockRange);
await this.consume(QueueNames.SYNC_BLOCKS, syncBlocks);
await this.consume(QueueNames.SYNC_MISSING, syncMissingBlocks);
await this.consume(QueueNames.SYNC_LAST, syncLastBlocks);
}

async setupWorkers() {
const opts = this.workOpts;
await this.start();
await this.work(QueueNames.SYNC_BLOCKS, opts, syncBlocks);
await this.work(QueueNames.SYNC_LAST, opts, syncLastBlocks);
await this.work(QueueNames.SYNC_MISSING, opts, syncMissingBlocks);
await this.work(QueueNames.ADD_BLOCK_RANGE, opts, addBlockRange);
console.log('⚡️ Queue running');
private parsePayload<P extends Payload>(msg: ConsumeMessage | null) {
const content = msg?.content?.toString();
if (!content) return null;
return JSON.parse(content) as P;
}

async activeJobs() {
return queue.getQueueSize(QueueNames.ADD_BLOCK_RANGE);
async getActive(queue: QueueNames) {
const res = await this.channel.checkQueue(queue);
return res.messageCount;
}
}

export const queue = new Queue({
host: DB_HOST,
port: Number(DB_PORT),
user: DB_USER,
password: DB_PASS,
database: DB_NAME,
max: 10,
});
export const mq = new RabbitMQConnection();
Loading

0 comments on commit 9a6b514

Please sign in to comment.