diff --git a/packages/graphql/.env.example b/packages/graphql/.env.example index 138890fc2..01f5bc9db 100644 --- a/packages/graphql/.env.example +++ b/packages/graphql/.env.example @@ -5,11 +5,17 @@ DB_PORT="5435" DB_USER="postgres" DB_PASS="postgres" DB_NAME="postgres" +RABBITMQ_HOST="127.0.0.1" +RABBITMQ_USER="guest" +RABBITMQ_PASS="guest" +RABBITMQ_QUEUE="notification" SYNC_MISSING= SERVER_BUILD= IS_DEV_TEST= SYNC_OFFSET=10 SYNC_LIMIT=10000 -QUEUE_CONCURRENCY=1000 +QUEUE_CONCURRENCY=40 WATCH_INTERVAL=5000 + + diff --git a/packages/graphql/.env.production b/packages/graphql/.env.production index 71f57d560..0c3d67867 100644 --- a/packages/graphql/.env.production +++ b/packages/graphql/.env.production @@ -5,6 +5,10 @@ DB_PORT="5435" DB_USER="postgres" DB_PASS="postgres" DB_NAME="postgres" +RABBITMQ_HOST="127.0.0.1" +RABBITMQ_USER="guest" +RABBITMQ_PASS="guest" +RABBITMQ_QUEUE="notification" SYNC_MISSING= SERVER_BUILD= diff --git a/packages/graphql/package.json b/packages/graphql/package.json index 0701b5cbb..ab6ea083b 100644 --- a/packages/graphql/package.json +++ b/packages/graphql/package.json @@ -60,6 +60,7 @@ "@graphql-tools/utils": "^10.1.3", "@types/cors": "^2.8.17", "@types/express": "^4.17.21", + "amqplib": "0.10.4", "cors": "^2.8.5", "dayjs": "1.11.10", "dotenv": "16.4.5", @@ -69,6 +70,7 @@ "fuels": "0.74.0", "graphql": "^16.8.1", "graphql-yoga": "5.3.0", + "kafkajs": "2.2.4", "lodash": "^4.17.21", "node-fetch": "3.3.2", "npm-run-all": "^4.1.5", @@ -90,6 +92,7 @@ "@graphql-codegen/typescript": "^4.0.6", "@graphql-codegen/typescript-graphql-request": "^6.2.0", "@graphql-codegen/typescript-operations": "^4.2.0", + "@types/amqplib": "0.10.5", "@types/lodash": "4.17.0", "@types/node": "^20.12.7", "@types/pg": "^8.11.5", diff --git a/packages/graphql/src/application/uc/AddBlockRange.ts b/packages/graphql/src/application/uc/AddBlockRange.ts index 447258734..9a0a5ee54 100644 --- a/packages/graphql/src/application/uc/AddBlockRange.ts +++ b/packages/graphql/src/application/uc/AddBlockRange.ts @@ -2,18 +2,13 @@ import { performance } from 'node:perf_hooks'; import c from 'chalk'; import { BlockRepository } from '~/domain/Block/BlockRepository'; import { db } from '~/infra/database/Db'; -import { - type QueueData, - type QueueInputs, - type QueueNames, - queue, -} from '~/infra/queue/Queue'; +import type { QueueInputs, QueueNames } from '~/infra/queue/Queue'; import { addTransactions } from './AddTransactions'; -type Input = QueueInputs[QueueNames.ADD_BLOCK_RANGE]; +type Data = QueueInputs[QueueNames.ADD_BLOCK_RANGE]; export class AddBlockRange { - async execute({ id, data }: QueueData) { + async execute(data: Data) { const { blocks } = data; const from = blocks[0].header.height; const to = blocks[blocks.length - 1].header.height; @@ -27,14 +22,13 @@ export class AddBlockRange { const end = performance.now(); const secs = Number.parseInt(`${(end - start) / 1000}`); console.log(c.green(`โœ… Synced blocks: #${from} - #${to} (${secs}s)`)); - await queue.complete(id); } } -export const addBlockRange = async (input: QueueData) => { +export const addBlockRange = async (data: Data) => { try { const { execute } = new AddBlockRange(); - await execute(input); + await execute(data); } catch (error) { console.error(error); throw new Error('Sync transactions', { diff --git a/packages/graphql/src/application/uc/SyncBlocks.ts b/packages/graphql/src/application/uc/SyncBlocks.ts index 62425f380..32930f46b 100644 --- a/packages/graphql/src/application/uc/SyncBlocks.ts +++ b/packages/graphql/src/application/uc/SyncBlocks.ts @@ -1,9 +1,8 @@ -import type { Input } from 'fuels'; -import type { QueueData, QueueInputs, QueueNames } from '~/infra/queue/Queue'; +import type { QueueInputs, QueueNames } from '~/infra/queue/Queue'; import { worker } from '~/infra/worker/Worker'; export type SyncBlocksProps = QueueInputs[QueueNames.SYNC_BLOCKS]; -export const syncBlocks = async ({ data }: QueueData) => { +export const syncBlocks = async (data: SyncBlocksProps) => { worker.run(data); }; diff --git a/packages/graphql/src/application/uc/SyncLastBlocks.ts b/packages/graphql/src/application/uc/SyncLastBlocks.ts index e33dfafdd..5b23f248a 100644 --- a/packages/graphql/src/application/uc/SyncLastBlocks.ts +++ b/packages/graphql/src/application/uc/SyncLastBlocks.ts @@ -1,30 +1,24 @@ import { BlockRepository } from '~/domain/Block/BlockRepository'; -import { - type QueueData, - type QueueInputs, - QueueNames, - queue, -} from '~/infra/queue/Queue'; +import { type QueueInputs, QueueNames, mq } from '~/infra/queue/Queue'; -type Props = QueueInputs[QueueNames.SYNC_LAST]; +type Data = QueueInputs[QueueNames.SYNC_LAST]; export class SyncLastBlocks { - async execute({ last, watch }: Props) { + async execute({ last, watch }: Data) { const repo = new BlockRepository(); const lastBlock = await repo.latestBlockFromNode(); const blockHeight = Number(lastBlock?.header.height ?? '0'); const from = blockHeight - last; console.log(`Syncing last ${last} blocks from ${from} to ${blockHeight}`); - await queue.push(QueueNames.SYNC_BLOCKS, { watch, cursor: from }); + await mq.send(QueueNames.SYNC_BLOCKS, { watch, cursor: from }); } } -export const syncLastBlocks = async ({ id, data }: QueueData) => { +export const syncLastBlocks = async (data: Data) => { try { const syncLastBlocks = new SyncLastBlocks(); await syncLastBlocks.execute(data); - await queue.complete(id); } catch (error) { console.error(error); throw new Error('Sync last', { diff --git a/packages/graphql/src/application/uc/SyncMissingBlocks.ts b/packages/graphql/src/application/uc/SyncMissingBlocks.ts index bbf2f797b..1ea7a0566 100644 --- a/packages/graphql/src/application/uc/SyncMissingBlocks.ts +++ b/packages/graphql/src/application/uc/SyncMissingBlocks.ts @@ -1,12 +1,7 @@ import { BlockRepository } from '~/domain/Block/BlockRepository'; -import { - type QueueData, - type QueueInputs, - QueueNames, - queue, -} from '~/infra/queue/Queue'; +import { type QueueInputs, QueueNames, mq } from '~/infra/queue/Queue'; -type Props = QueueInputs[QueueNames.SYNC_MISSING]; +type Data = QueueInputs[QueueNames.SYNC_MISSING]; export class SyncMissingBlocks { async execute() { @@ -14,16 +9,15 @@ export class SyncMissingBlocks { const latest = await repo.findLatestAdded(); const cursor = latest ? Number(latest.data.header.height) : undefined; console.log('Syncing missing blocks from', cursor); - await queue.push(QueueNames.SYNC_BLOCKS, { cursor, watch: true }); + await mq.send(QueueNames.SYNC_BLOCKS, { cursor, watch: true }); } } -export const syncMissingBlocks = async ({ id }: QueueData) => { +export const syncMissingBlocks = async (_: Data) => { try { console.log('Syncing missing blocks'); const syncMissingBlocks = new SyncMissingBlocks(); await syncMissingBlocks.execute(); - await queue.complete(id); } catch (error) { console.error(error); throw new Error('Sync missing', { diff --git a/packages/graphql/src/config.ts b/packages/graphql/src/config.ts index 57e37377e..fc062be81 100644 --- a/packages/graphql/src/config.ts +++ b/packages/graphql/src/config.ts @@ -11,6 +11,9 @@ const schema = zod.object({ DB_USER: zod.string(), DB_PASS: zod.string(), DB_NAME: zod.string(), + RABBITMQ_HOST: zod.string().default('127.0.0.1'), + RABBITMQ_USER: zod.string().default('guest'), + RABBITMQ_PASS: zod.string().default('guest'), SYNC_MISSING: falsy.optional(), SERVER_BUILD: falsy.optional(), IS_DEV_TEST: falsy.optional(), @@ -28,6 +31,9 @@ export const env = new Env(schema, { DB_USER: 'postgres', DB_PASS: 'postgres', DB_NAME: 'postgres', + RABBITMQ_HOST: 'localhost', + RABBITMQ_USER: 'guest', + RABBITMQ_PASS: 'guest', SERVER_BUILD: false, SYNC_MISSING: false, IS_DEV_TEST: false, diff --git a/packages/graphql/src/infra/queue/Queue.ts b/packages/graphql/src/infra/queue/Queue.ts index fd22ed898..f9e1c2f71 100644 --- a/packages/graphql/src/infra/queue/Queue.ts +++ b/packages/graphql/src/infra/queue/Queue.ts @@ -1,4 +1,8 @@ -import PgBoss, { type Job } from 'pg-boss'; +import client, { + type Channel, + type Connection, + type ConsumeMessage, +} from 'amqplib'; import { addBlockRange } from '~/application/uc/AddBlockRange'; import { syncBlocks } from '~/application/uc/SyncBlocks'; import { syncLastBlocks } from '~/application/uc/SyncLastBlocks'; @@ -6,11 +10,15 @@ import { syncMissingBlocks } from '~/application/uc/SyncMissingBlocks'; import { env } from '~/config'; import type { GQLBlock } from '~/graphql/generated/sdk'; -const DB_HOST = env.get('DB_HOST'); -const DB_PORT = env.get('DB_PORT'); -const DB_USER = env.get('DB_USER'); -const DB_PASS = env.get('DB_PASS'); -const DB_NAME = env.get('DB_NAME'); +const HOST = env.get('RABBITMQ_HOST'); +const USER = env.get('RABBITMQ_USER'); +const PASS = env.get('RABBITMQ_PASS'); +const MAX_WORKERS = Number(env.get('QUEUE_CONCURRENCY')); + +type Payload = { + type: QueueNames; + data: D; +}; export enum QueueNames { SYNC_BLOCKS = 'indexer/sync-blocks', @@ -37,80 +45,97 @@ export type QueueInputs = { }; }; -export type QueueData = Job; +class RabbitMQConnection { + connection!: Connection; + channel!: Channel; + private connected!: Boolean; -export class Queue extends PgBoss { - private workOpts: PgBoss.WorkOptions = { - teamSize: Number(env.get('QUEUE_CONCURRENCY')), - teamConcurrency: Number(env.get('QUEUE_CONCURRENCY')), - teamRefill: true, - }; + async connect() { + if (this.connected && this.channel) return; + this.connected = true; - static defaultJobOptions = { - retryLimit: 10, - retryDelay: 1, - retryBackoff: false, - expireInSeconds: 120, - }; + try { + console.log('โŒ›๏ธ Connecting to Rabbit-MQ Server'); + const url = `amqp://${USER}:${PASS}@${HOST}:5672`; + this.connection = await client.connect(url); + console.log('โœ… Rabbit MQ Connection is ready'); + this.channel = await this.connection.createChannel(); + await this.channel.prefetch(MAX_WORKERS); + console.log('๐Ÿ›ธ Created RabbitMQ Channel successfully'); + } catch (error) { + console.error(error); + console.error('Not connected to MQ Server'); + } + } - push( - queue: Q, - data?: QueueInputs[Q] | null, - options?: PgBoss.JobOptions, - ) { - // console.log(`Pushing job to queue ${queue}`); - return this.send(queue, data as object, { - ...Queue.defaultJobOptions, - ...options, - }); + async disconnect() { + await this.channel.close(); + await this.connection.close(); + } + + async clean() { + const names = Object.values(QueueNames); + for (const name of names) { + await this.channel.deleteQueue(name); + } + await this.channel.deleteExchange('blocks'); + console.log('๐Ÿงน Cleaned all queues'); } - pushSingleton( + async send>( queue: Q, - data?: QueueInputs[Q] | null, - options?: PgBoss.JobOptions, + data?: P['data'], ) { - // console.log(`Pushing job to queue ${queue}`); - return this.sendSingleton(queue, data as object, { - ...Queue.defaultJobOptions, - ...options, - }); + try { + if (!this.channel) { + await this.connect(); + } + const payload = { type: queue, data } as P; + const buffer = Buffer.from(JSON.stringify(payload)); + this.channel.sendToQueue(queue, buffer, { persistent: true }); + } catch (error) { + console.error(error); + throw error; + } } - pushBatch( + async consume>( queue: Q, - data: Array, - opts?: Partial>, + handler: (data: P['data']) => void, ) { - const jobs: Array> = data.map((job) => ({ - ...Queue.defaultJobOptions, - ...opts, - name: queue, - data: job ?? {}, - })); - return this.insert(jobs); + await this.channel.assertQueue(queue, { durable: true }); + await this.channel.consume( + queue, + (msg) => { + if (!msg) return; + const payload = this.parsePayload

(msg); + if (payload?.type === queue) { + handler(payload.data); + this.channel.ack(msg); + } + }, + { noAck: false }, + ); + } + + async setup() { + await this.connect(); + await this.consume(QueueNames.ADD_BLOCK_RANGE, addBlockRange); + await this.consume(QueueNames.SYNC_BLOCKS, syncBlocks); + await this.consume(QueueNames.SYNC_MISSING, syncMissingBlocks); + await this.consume(QueueNames.SYNC_LAST, syncLastBlocks); } - async setupWorkers() { - const opts = this.workOpts; - await this.start(); - await this.work(QueueNames.SYNC_BLOCKS, opts, syncBlocks); - await this.work(QueueNames.SYNC_LAST, opts, syncLastBlocks); - await this.work(QueueNames.SYNC_MISSING, opts, syncMissingBlocks); - await this.work(QueueNames.ADD_BLOCK_RANGE, opts, addBlockRange); - console.log('โšก๏ธ Queue running'); + private parsePayload

(msg: ConsumeMessage | null) { + const content = msg?.content?.toString(); + if (!content) return null; + return JSON.parse(content) as P; } - async activeJobs() { - return queue.getQueueSize(QueueNames.ADD_BLOCK_RANGE); + async getActive(queue: QueueNames) { + const res = await this.channel.checkQueue(queue); + return res.messageCount; } } -export const queue = new Queue({ - host: DB_HOST, - port: Number(DB_PORT), - user: DB_USER, - password: DB_PASS, - database: DB_NAME, - max: 10, -}); +export const mq = new RabbitMQConnection(); diff --git a/packages/graphql/src/infra/server/Program.ts b/packages/graphql/src/infra/server/Program.ts index a04187b2d..ec360be89 100644 --- a/packages/graphql/src/infra/server/Program.ts +++ b/packages/graphql/src/infra/server/Program.ts @@ -1,7 +1,7 @@ import { sql } from 'drizzle-orm'; import yargs from 'yargs/yargs'; import { db } from '../database/Db'; -import { QueueNames, queue } from '../queue/Queue'; +import { QueueNames, mq } from '../queue/Queue'; type Arguments = { all: boolean; @@ -90,35 +90,37 @@ export class Program { async sync(argv: Arguments) { const { all, missing, clean, from, watch, last } = argv; - await db.connect(); - await queue.start(); + async function start() { + await db.connect(); + await mq.setup(); + } async function finish() { - await queue.stop(); + await mq.disconnect(); await db.close(); } if (clean) { - const query = sql` - DROP SCHEMA pgboss CASCADE; - CREATE SCHEMA pgboss; - `; - await db.connection().execute(query); + await mq.connect(); + await mq.clean(); await finish(); return; } if (missing) { - await queue.push(QueueNames.SYNC_MISSING); + await start(); + await mq.send(QueueNames.SYNC_MISSING); await finish(); return; } if (last) { - await queue.push(QueueNames.SYNC_LAST, { watch, last }); + await start(); + await mq.send(QueueNames.SYNC_LAST, { watch, last }); await finish(); return; } if (all || from) { - await queue.push(QueueNames.SYNC_BLOCKS, { watch, cursor: from ?? 0 }); + await start(); + await mq.send(QueueNames.SYNC_BLOCKS, { watch, cursor: from ?? 0 }); await finish(); return; } diff --git a/packages/graphql/src/infra/worker/Worker.ts b/packages/graphql/src/infra/worker/Worker.ts index 2bf5a3701..02a0a8607 100644 --- a/packages/graphql/src/infra/worker/Worker.ts +++ b/packages/graphql/src/infra/worker/Worker.ts @@ -1,8 +1,8 @@ import path from 'node:path'; import { Workery } from '~/core/Workery'; -import { type QueueInputs, QueueNames, queue } from '../queue/Queue'; +import { type QueueInputs, QueueNames, mq } from '../queue/Queue'; -type Events = { +type WorkerEvents = { ADD_BLOCK_RANGE: QueueInputs[QueueNames.ADD_BLOCK_RANGE][]; GET_ACTIVE_JOBS: null; ACTIVE_JOBS_RESPONSE: number; @@ -12,19 +12,23 @@ type Events = { }; }; -export const worker = Workery.build({ +export const worker = Workery.build({ handler: path.resolve(__dirname, '../../application/uc/RunSyncMachine.ts'), }); worker.on('ADD_BLOCK_RANGE', async (payloads) => { const from = String(payloads[0].from); const to = String(payloads[payloads.length - 1].to); - await queue.pushBatch(QueueNames.ADD_BLOCK_RANGE, payloads, { priority: 2 }); + await Promise.all( + payloads.map((payload) => { + return mq.send(QueueNames.ADD_BLOCK_RANGE, payload); + }), + ); worker.postMessage('BLOCK_RANGE_ADDED', { from, to }); }); worker.on('GET_ACTIVE_JOBS', async () => { - const jobs = await queue.activeJobs(); + const jobs = await mq.getActive(QueueNames.ADD_BLOCK_RANGE); console.log(`โšก๏ธ Active jobs: ${jobs}`); worker.postMessage('ACTIVE_JOBS_RESPONSE', jobs); }); diff --git a/packages/graphql/src/schemas/fuelcore.graphql b/packages/graphql/src/schemas/fuelcore.graphql new file mode 100644 index 000000000..28084d0fd --- /dev/null +++ b/packages/graphql/src/schemas/fuelcore.graphql @@ -0,0 +1,864 @@ +schema { + query: Query + mutation: Mutation + subscription: Subscription +} + +scalar Address + +scalar AssetId + +type Balance { + amount: U64! + assetId: AssetId! + owner: Address! +} + +type BalanceConnection { + """A list of edges.""" + edges: [BalanceEdge!]! + """A list of nodes.""" + nodes: [Balance!]! + """Information to aid in pagination.""" + pageInfo: PageInfo! +} + +"""An edge in a connection.""" +type BalanceEdge { + """A cursor for use in pagination""" + cursor: String! + """The item at the end of the edge""" + node: Balance! +} + +input BalanceFilterInput { + """Filter coins based on the `owner` field""" + owner: Address! +} + +type Block { + consensus: Consensus! + header: Header! + id: BlockId! + transactions: [Transaction!]! +} + +type BlockConnection { + """A list of edges.""" + edges: [BlockEdge!]! + """A list of nodes.""" + nodes: [Block!]! + """Information to aid in pagination.""" + pageInfo: PageInfo! +} + +"""An edge in a connection.""" +type BlockEdge { + """A cursor for use in pagination""" + cursor: String! + """The item at the end of the edge""" + node: Block! +} + +scalar BlockId + +input Breakpoint { + contract: ContractId! + pc: U64! +} + +scalar Bytes32 + +type ChainInfo { + consensusParameters: ConsensusParameters! + daHeight: U64! + gasCosts: GasCosts! + latestBlock: Block! + name: String! +} + +type ChangeOutput { + amount: U64! + assetId: AssetId! + to: Address! +} + +type Coin { + amount: U64! + assetId: AssetId! + """TxPointer - the height of the block this coin was created in""" + blockCreated: U32! + maturity: U32! + owner: Address! + """TxPointer - the index of the transaction that created this coin""" + txCreatedIdx: U64! + utxoId: UtxoId! +} + +type CoinConnection { + """A list of edges.""" + edges: [CoinEdge!]! + """A list of nodes.""" + nodes: [Coin!]! + """Information to aid in pagination.""" + pageInfo: PageInfo! +} + +"""An edge in a connection.""" +type CoinEdge { + """A cursor for use in pagination""" + cursor: String! + """The item at the end of the edge""" + node: Coin! +} + +input CoinFilterInput { + """Returns coins only with `asset_id`.""" + assetId: AssetId + """Returns coins owned by the `owner`.""" + owner: Address! +} + +type CoinOutput { + amount: U64! + assetId: AssetId! + to: Address! +} + +"""The schema analog of the [`coins::CoinType`].""" +union CoinType = Coin | MessageCoin + +union Consensus = Genesis | PoAConsensus + +type ConsensusParameters { + baseAssetId: AssetId! + chainId: U64! + contractParams: ContractParameters! + feeParams: FeeParameters! + gasCosts: GasCosts! + predicateParams: PredicateParameters! + scriptParams: ScriptParameters! + txParams: TxParameters! +} + +type Contract { + bytecode: HexString! + id: ContractId! + salt: Salt! +} + +type ContractBalance { + amount: U64! + assetId: AssetId! + contract: ContractId! +} + +type ContractBalanceConnection { + """A list of edges.""" + edges: [ContractBalanceEdge!]! + """A list of nodes.""" + nodes: [ContractBalance!]! + """Information to aid in pagination.""" + pageInfo: PageInfo! +} + +"""An edge in a connection.""" +type ContractBalanceEdge { + """A cursor for use in pagination""" + cursor: String! + """The item at the end of the edge""" + node: ContractBalance! +} + +input ContractBalanceFilterInput { + """Filter assets based on the `contractId` field""" + contract: ContractId! +} + +type ContractCreated { + contract: Contract! + stateRoot: Bytes32! +} + +scalar ContractId + +type ContractOutput { + balanceRoot: Bytes32! + inputIndex: Int! + stateRoot: Bytes32! +} + +type ContractParameters { + contractMaxSize: U64! + maxStorageSlots: U64! +} + +union DependentCost = HeavyOperation | LightOperation + +input ExcludeInput { + """Messages to exclude from the selection.""" + messages: [Nonce!]! + """Utxos to exclude from the selection.""" + utxos: [UtxoId!]! +} + +type FailureStatus { + block: Block! + programState: ProgramState + reason: String! + receipts: [Receipt!]! + time: Tai64Timestamp! + transactionId: TransactionId! +} + +type FeeParameters { + gasPerByte: U64! + gasPriceFactor: U64! +} + +type GasCosts { + add: U64! + addi: U64! + aloc: U64! + and: U64! + andi: U64! + bal: U64! + bhei: U64! + bhsh: U64! + burn: U64! + call: DependentCost! + cb: U64! + ccp: DependentCost! + cfei: U64! + cfsi: U64! + contractRoot: DependentCost! + croo: U64! + csiz: DependentCost! + div: U64! + divi: U64! + eck1: U64! + ecr1: U64! + ed19: U64! + eq: U64! + exp: U64! + expi: U64! + flag: U64! + gm: U64! + gt: U64! + gtf: U64! + ji: U64! + jmp: U64! + jmpb: U64! + jmpf: U64! + jne: U64! + jneb: U64! + jnef: U64! + jnei: U64! + jnzb: U64! + jnzf: U64! + jnzi: U64! + k256: DependentCost! + lb: U64! + ldc: DependentCost! + log: U64! + logd: DependentCost! + lt: U64! + lw: U64! + mcl: DependentCost! + mcli: DependentCost! + mcp: DependentCost! + mcpi: DependentCost! + meq: DependentCost! + mint: U64! + mldv: U64! + mlog: U64! + modOp: U64! + modi: U64! + moveOp: U64! + movi: U64! + mroo: U64! + mul: U64! + muli: U64! + newStoragePerByte: U64! + noop: U64! + not: U64! + or: U64! + ori: U64! + poph: U64! + popl: U64! + pshh: U64! + pshl: U64! + ret: U64! + retd: DependentCost! + rvrt: U64! + s256: DependentCost! + sb: U64! + scwq: DependentCost! + sll: U64! + slli: U64! + smo: DependentCost! + srl: U64! + srli: U64! + srw: U64! + srwq: DependentCost! + stateRoot: DependentCost! + sub: U64! + subi: U64! + sw: U64! + sww: U64! + swwq: DependentCost! + time: U64! + tr: U64! + tro: U64! + vmInitialization: DependentCost! + wdam: U64! + wdcm: U64! + wddv: U64! + wdmd: U64! + wdml: U64! + wdmm: U64! + wdop: U64! + wqam: U64! + wqcm: U64! + wqdv: U64! + wqmd: U64! + wqml: U64! + wqmm: U64! + wqop: U64! + xor: U64! + xori: U64! +} + +type Genesis { + """ + The chain configs define what consensus type to use, what settlement layer to use, + rules of block validity, etc. + """ + chainConfigHash: Bytes32! + """The Binary Merkle Tree root of all genesis coins.""" + coinsRoot: Bytes32! + """ + The Binary Merkle Tree root of state, balances, contracts code hash of each contract. + """ + contractsRoot: Bytes32! + """The Binary Merkle Tree root of all genesis messages.""" + messagesRoot: Bytes32! +} + +type Header { + """Hash of the application header.""" + applicationHash: Bytes32! + """ + The layer 1 height of messages and events to include since the last layer 1 block number. + """ + daHeight: U64! + """Fuel block height.""" + height: U32! + """Hash of the header""" + id: BlockId! + """Number of message receipts in this block.""" + messageReceiptCount: U64! + """Merkle root of message receipts in this block.""" + messageReceiptRoot: Bytes32! + """Merkle root of all previous block header hashes.""" + prevRoot: Bytes32! + """The block producer time.""" + time: Tai64Timestamp! + """Number of transactions in this block.""" + transactionsCount: U64! + """Merkle root of transactions.""" + transactionsRoot: Bytes32! +} + +type HeavyOperation { + base: U64! + gasPerUnit: U64! +} + +scalar HexString + +union Input = InputCoin | InputContract | InputMessage + +type InputCoin { + amount: U64! + assetId: AssetId! + maturity: U32! + owner: Address! + predicate: HexString! + predicateData: HexString! + predicateGasUsed: U64! + txPointer: TxPointer! + utxoId: UtxoId! + witnessIndex: Int! +} + +type InputContract { + balanceRoot: Bytes32! + contract: Contract! + stateRoot: Bytes32! + txPointer: TxPointer! + utxoId: UtxoId! +} + +type InputMessage { + amount: U64! + data: HexString! + nonce: Nonce! + predicate: HexString! + predicateData: HexString! + predicateGasUsed: U64! + recipient: Address! + sender: Address! + witnessIndex: Int! +} + +type LightOperation { + base: U64! + unitsPerGas: U64! +} + +type MerkleProof { + proofIndex: U64! + proofSet: [Bytes32!]! +} + +type Message { + amount: U64! + daHeight: U64! + data: HexString! + nonce: Nonce! + recipient: Address! + sender: Address! +} + +type MessageCoin { + amount: U64! + assetId: AssetId! + daHeight: U64! + nonce: Nonce! + recipient: Address! + sender: Address! +} + +type MessageConnection { + """A list of edges.""" + edges: [MessageEdge!]! + """A list of nodes.""" + nodes: [Message!]! + """Information to aid in pagination.""" + pageInfo: PageInfo! +} + +"""An edge in a connection.""" +type MessageEdge { + """A cursor for use in pagination""" + cursor: String! + """The item at the end of the edge""" + node: Message! +} + +type MessageProof { + amount: U64! + blockProof: MerkleProof! + commitBlockHeader: Header! + data: HexString! + messageBlockHeader: Header! + messageProof: MerkleProof! + nonce: Nonce! + recipient: Address! + sender: Address! +} + +enum MessageState { + NOT_FOUND + SPENT + UNSPENT +} + +type MessageStatus { + state: MessageState! +} + +type Mutation { + continueTx(id: ID!): RunResult! + """ + Execute a dry-run of the transaction using a fork of current state, no changes are committed. + """ + dryRun(tx: HexString!, utxoValidation: Boolean): [Receipt!]! + endSession(id: ID!): Boolean! + execute(id: ID!, op: String!): Boolean! + """ + Sequentially produces `blocks_to_produce` blocks. The first block starts with + `start_timestamp`. If the block production in the [`crate::service::Config`] is + `Trigger::Interval { block_time }`, produces blocks with `block_time ` intervals between + them. The `start_timestamp` is the timestamp in seconds. + """ + produceBlocks(blocksToProduce: U32!, startTimestamp: Tai64Timestamp): U32! + reset(id: ID!): Boolean! + setBreakpoint(breakpoint: Breakpoint!, id: ID!): Boolean! + setSingleStepping(enable: Boolean!, id: ID!): Boolean! + startSession: ID! + startTx(id: ID!, txJson: String!): RunResult! + """ + Submits transaction to the `TxPool`. + + Returns submitted transaction if the transaction is included in the `TxPool` without problems. + """ + submit(tx: HexString!): Transaction! +} + +type NodeInfo { + maxDepth: U64! + maxTx: U64! + minGasPrice: U64! + nodeVersion: String! + peers: [PeerInfo!]! + utxoValidation: Boolean! + vmBacktrace: Boolean! +} + +scalar Nonce + +union Output = ChangeOutput | CoinOutput | ContractCreated | ContractOutput | VariableOutput + +""" +A separate `Breakpoint` type to be used as an output, as a single +type cannot act as both input and output type in async-graphql +""" +type OutputBreakpoint { + contract: ContractId! + pc: U64! +} + +"""Information about pagination in a connection""" +type PageInfo { + """When paginating forwards, the cursor to continue.""" + endCursor: String + """When paginating forwards, are there more items?""" + hasNextPage: Boolean! + """When paginating backwards, are there more items?""" + hasPreviousPage: Boolean! + """When paginating backwards, the cursor to continue.""" + startCursor: String +} + +type PeerInfo { + """The advertised multi-addrs that can be used to connect to this peer""" + addresses: [String!]! + """The internal fuel p2p reputation of this peer""" + appScore: Float! + """The last reported height of the peer""" + blockHeight: U32 + """The self-reported version of the client the peer is using""" + clientVersion: String + """The libp2p peer id""" + id: String! + """The last heartbeat from this peer in unix epoch time ms""" + lastHeartbeatMs: U64! +} + +type PoAConsensus { + """Gets the signature of the block produced by `PoA` consensus.""" + signature: Signature! +} + +type Policies { + gasPrice: U64 + maturity: U32 + maxFee: U64 + witnessLimit: U64 +} + +type PredicateParameters { + maxGasPerPredicate: U64! + maxMessageDataLength: U64! + maxPredicateDataLength: U64! + maxPredicateLength: U64! +} + +type ProgramState { + data: HexString! + returnType: ReturnType! +} + +type Query { + balance( + """asset_id of the coin""" + assetId: AssetId! + """address of the owner""" + owner: Address! + ): Balance! + balances(after: String, before: String, filter: BalanceFilterInput!, first: Int, last: Int): BalanceConnection! + block( + """Height of the block""" + height: U32 + """ID of the block""" + id: BlockId + ): Block + blocks(after: String, before: String, first: Int, last: Int): BlockConnection! + chain: ChainInfo! + """Gets the coin by `utxo_id`.""" + coin( + """The ID of the coin""" + utxoId: UtxoId! + ): Coin + """ + Gets all unspent coins of some `owner` maybe filtered with by `asset_id` per page. + """ + coins(after: String, before: String, filter: CoinFilterInput!, first: Int, last: Int): CoinConnection! + """ + For each `query_per_asset`, get some spendable coins(of asset specified by the query) owned by + `owner` that add up at least the query amount. The returned coins can be spent. + The number of coins is optimized to prevent dust accumulation. + + The query supports excluding and maximum the number of coins. + + Returns: + The list of spendable coins per asset from the query. The length of the result is + the same as the length of `query_per_asset`. The ordering of assets and `query_per_asset` + is the same. + """ + coinsToSpend( + """The excluded coins from the selection.""" + excludedIds: ExcludeInput + """The `Address` of the coins owner.""" + owner: Address! + """ + The list of requested assets` coins with asset ids, `target` amount the user wants to reach, and the `max` number of coins in the selection. Several entries with the same asset id are not allowed. + """ + queryPerAsset: [SpendQueryElementInput!]! + ): [[CoinType!]!]! + contract( + """ID of the Contract""" + id: ContractId! + ): Contract + contractBalance(asset: AssetId!, contract: ContractId!): ContractBalance! + contractBalances(after: String, before: String, filter: ContractBalanceFilterInput!, first: Int, last: Int): ContractBalanceConnection! + """Estimate the predicate gas for the provided transaction""" + estimatePredicates(tx: HexString!): Transaction! + """Returns true when the GraphQL API is serving requests.""" + health: Boolean! + memory(id: ID!, size: U32!, start: U32!): String! + messageProof(commitBlockHeight: U32, commitBlockId: BlockId, nonce: Nonce!, transactionId: TransactionId!): MessageProof + messageStatus(nonce: Nonce!): MessageStatus! + messages( + after: String + before: String + first: Int + last: Int + """address of the owner""" + owner: Address + ): MessageConnection! + nodeInfo: NodeInfo! + register(id: ID!, register: U32!): U64! + transaction( + """The ID of the transaction""" + id: TransactionId! + ): Transaction + transactions(after: String, before: String, first: Int, last: Int): TransactionConnection! + transactionsByOwner(after: String, before: String, first: Int, last: Int, owner: Address!): TransactionConnection! +} + +type Receipt { + amount: U64 + assetId: AssetId + contract: Contract + contractId: ContractId + data: HexString + digest: Bytes32 + gas: U64 + gasUsed: U64 + is: U64 + len: U64 + nonce: Nonce + param1: U64 + param2: U64 + pc: U64 + ptr: U64 + ra: U64 + rb: U64 + rc: U64 + rd: U64 + reason: U64 + receiptType: ReceiptType! + recipient: Address + result: U64 + sender: Address + subId: Bytes32 + to: Contract + toAddress: Address + val: U64 +} + +enum ReceiptType { + BURN + CALL + LOG + LOG_DATA + MESSAGE_OUT + MINT + PANIC + RETURN + RETURN_DATA + REVERT + SCRIPT_RESULT + TRANSFER + TRANSFER_OUT +} + +enum ReturnType { + RETURN + RETURN_DATA + REVERT +} + +type RunResult { + breakpoint: OutputBreakpoint + jsonReceipts: [String!]! + state: RunState! +} + +enum RunState { + """Stopped on a breakpoint""" + BREAKPOINT + """All breakpoints have been processed, and the program has terminated""" + COMPLETED +} + +scalar Salt + +type ScriptParameters { + maxScriptDataLength: U64! + maxScriptLength: U64! +} + +scalar Signature + +input SpendQueryElementInput { + """Target amount for the query.""" + amount: U64! + """Identifier of the asset to spend.""" + assetId: AssetId! + """The maximum number of currencies for selection.""" + max: U32 +} + +type SqueezedOutStatus { + reason: String! +} + +type SubmittedStatus { + time: Tai64Timestamp! +} + +type Subscription { + """ + Returns a stream of status updates for the given transaction id. + If the current status is [`TransactionStatus::Success`], [`TransactionStatus::SqueezedOut`] + or [`TransactionStatus::Failed`] the stream will return that and end immediately. + If the current status is [`TransactionStatus::Submitted`] this will be returned + and the stream will wait for a future update. + + This stream will wait forever so it's advised to use within a timeout. + + It is possible for the stream to miss an update if it is polled slower + then the updates arrive. In such a case the stream will close without + a status. If this occurs the stream can simply be restarted to return + the latest status. + """ + statusChange( + """The ID of the transaction""" + id: TransactionId! + ): TransactionStatus! + """ + Submits transaction to the `TxPool` and await either confirmation or failure. + """ + submitAndAwait(tx: HexString!): TransactionStatus! +} + +type SuccessStatus { + block: Block! + programState: ProgramState + receipts: [Receipt!]! + time: Tai64Timestamp! + transactionId: TransactionId! +} + +scalar Tai64Timestamp + +type Transaction { + bytecodeLength: U64 + bytecodeWitnessIndex: Int + gasPrice: U64 + id: TransactionId! + inputAssetIds: [AssetId!] + inputContract: InputContract + inputContracts: [Contract!] + inputs: [Input!] + isCreate: Boolean! + isMint: Boolean! + isScript: Boolean! + maturity: U32 + mintAmount: U64 + mintAssetId: AssetId + outputContract: ContractOutput + outputs: [Output!]! + policies: Policies + """Return the transaction bytes using canonical encoding""" + rawPayload: HexString! + receipts: [Receipt!] + receiptsRoot: Bytes32 + salt: Salt + script: HexString + scriptData: HexString + scriptGasLimit: U64 + status: TransactionStatus + storageSlots: [HexString!] + txPointer: TxPointer + witnesses: [HexString!] +} + +type TransactionConnection { + """A list of edges.""" + edges: [TransactionEdge!]! + """A list of nodes.""" + nodes: [Transaction!]! + """Information to aid in pagination.""" + pageInfo: PageInfo! +} + +"""An edge in a connection.""" +type TransactionEdge { + """A cursor for use in pagination""" + cursor: String! + """The item at the end of the edge""" + node: Transaction! +} + +scalar TransactionId + +union TransactionStatus = FailureStatus | SqueezedOutStatus | SubmittedStatus | SuccessStatus + +type TxParameters { + maxGasPerTx: U64! + maxInputs: U8! + maxOutputs: U8! + maxSize: U64! + maxWitnesses: U32! +} + +scalar TxPointer + +scalar U8 + +scalar U32 + +scalar U64 + +scalar UtxoId + +type VariableOutput { + amount: U64! + assetId: AssetId! + to: Address! +} \ No newline at end of file diff --git a/packages/graphql/src/syncer.ts b/packages/graphql/src/syncer.ts index b58972fc4..4c8583215 100644 --- a/packages/graphql/src/syncer.ts +++ b/packages/graphql/src/syncer.ts @@ -1,35 +1,27 @@ import { setTimeout } from 'node:timers/promises'; import { env } from './config'; -import { QueueNames, queue } from './infra/queue/Queue'; +import { QueueNames, mq } from './infra/queue/Queue'; import { Server } from './infra/server/App'; const httpServer = new Server(); const app = httpServer.setup(); httpServer.listen(app, 3001).then(async () => { - await queue.setupWorkers(); + await mq.setup(); console.log('๐Ÿ“Ÿ Sync running on http://localhost:3001'); - if (env.get('SYNC_MISSING')) { - console.log('๐Ÿ• Syncing missing blocks in 5 seconds...'); - await setTimeout(5000); - await queue.push(QueueNames.SYNC_MISSING, undefined); - } + // if (env.get('SYNC_MISSING')) { + // console.log('๐Ÿ• Syncing missing blocks in 5 seconds...'); + // await setTimeout(5000); + // await mq.send(QueueNames.SYNC_MISSING); + // } - const others = [ - 'SIGINT', - 'SIGUSR1', - 'SIGUSR2', - // 'uncaughtException', - 'SIGTERM', - 'exit', - ]; + const others = ['SIGINT', 'SIGUSR1', 'SIGUSR2', 'SIGTERM', 'beforeExit']; //biome-ignore lint/complexity/noForEach: others.forEach((eventType) => { process.on(eventType, async (err) => { - console.log('๐Ÿ›‘ Queue stopped'); + await mq.disconnect(); console.error(err); - await queue.stop(); }); }); }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c0b2054c4..3ddd47b45 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -558,6 +558,9 @@ importers: '@types/express': specifier: ^4.17.21 version: 4.17.21 + amqplib: + specifier: 0.10.4 + version: 0.10.4 cors: specifier: ^2.8.5 version: 2.8.5 @@ -585,6 +588,9 @@ importers: graphql-yoga: specifier: 5.3.0 version: 5.3.0(graphql@16.8.1) + kafkajs: + specifier: 2.2.4 + version: 2.2.4 lodash: specifier: ^4.17.21 version: 4.17.21 @@ -643,6 +649,9 @@ importers: '@graphql-codegen/typescript-operations': specifier: ^4.2.0 version: 4.2.0(graphql@16.8.1) + '@types/amqplib': + specifier: 0.10.5 + version: 0.10.5 '@types/lodash': specifier: 4.17.0 version: 4.17.0 @@ -900,6 +909,17 @@ importers: packages: + /@acuminous/bitsyntax@0.1.2: + resolution: {integrity: sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ==} + engines: {node: '>=0.8'} + dependencies: + buffer-more-ints: 1.0.0 + debug: 4.3.4(supports-color@8.1.1) + safe-buffer: 5.1.2 + transitivePeerDependencies: + - supports-color + dev: false + /@adobe/css-tools@4.3.3: resolution: {integrity: sha512-rE0Pygv0sEZ4vBWHlAgJLGDU7Pm8xoO6p3wsEceb7GYAjScrOHpEo8KK/eVkAcnSM+slAEtXjA2JpdjLp4fJQQ==} dev: true @@ -11528,6 +11548,12 @@ packages: /@tsconfig/node16@1.0.4: resolution: {integrity: sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==} + /@types/amqplib@0.10.5: + resolution: {integrity: sha512-/cSykxROY7BWwDoi4Y4/jLAuZTshZxd8Ey1QYa/VaXriMotBDoou7V/twJiOSHzU6t1Kp1AHAUXGCgqq+6DNeg==} + dependencies: + '@types/node': 20.12.7 + dev: true + /@types/aria-query@4.2.2: resolution: {integrity: sha512-HnYpAE1Y6kRyKM/XkEuiRQhTHvkzMBurTHnpFLYLBGPIylZNPs9jJcuOOYWxPLJCSEtmZT0Y8rHDokKN7rRTig==} dev: true @@ -13164,6 +13190,18 @@ packages: resolution: {integrity: sha512-OwIuC4yZaRogHKiuU5WlMR5Xk/jAcpPtawWL05Gj8Lvm2F6mwoJt4O/bHI+DHwG79vWd+8OFYM4/BzYqyRd3qw==} dev: false + /amqplib@0.10.4: + resolution: {integrity: sha512-DMZ4eCEjAVdX1II2TfIUpJhfKAuoCeDIo/YyETbfAqehHTXxxs7WOOd+N1Xxr4cKhx12y23zk8/os98FxlZHrw==} + engines: {node: '>=10'} + dependencies: + '@acuminous/bitsyntax': 0.1.2 + buffer-more-ints: 1.0.0 + readable-stream: 1.1.14 + url-parse: 1.5.10 + transitivePeerDependencies: + - supports-color + dev: false + /ansi-colors@4.1.3: resolution: {integrity: sha512-/6w/C21Pm1A7aZitlI5Ni/2J6FFQN8i1Cvz3kHABAAbw93v/NlvKdVOqz7CCWz/3iv/JplRSEEZ83XION15ovw==} engines: {node: '>=6'} @@ -14109,6 +14147,10 @@ packages: /buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} + /buffer-more-ints@1.0.0: + resolution: {integrity: sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==} + dev: false + /buffer-xor@1.0.3: resolution: {integrity: sha512-571s0T7nZWK6vB67HI5dyUF7wXiNcfaPPPTl6zYCNApANjIvYJTg7hlud/+cJpdAhS7dVzqMLmfhfHR3rAcOjQ==} dev: true @@ -19135,6 +19177,10 @@ packages: dependencies: system-architecture: 0.1.0 + /isarray@0.0.1: + resolution: {integrity: sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==} + dev: false + /isarray@1.0.0: resolution: {integrity: sha512-VLghIWNM6ELQzo7zwmcg0NmTVyWKYjvIeM83yjp0wRDTmUnrM678fQbcKBo6n2CJEF0szoG//ytg+TKla89ALQ==} @@ -20027,6 +20073,11 @@ packages: resolution: {integrity: sha512-cxQGGUiit6CGUpuuiezY8N4m1wgF4o7127rXEXDFcxeDUFfdV7gSkwA26Fe2wWBiNQq2SZOgN4gSmMxB/StA8Q==} dev: false + /kafkajs@2.2.4: + resolution: {integrity: sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==} + engines: {node: '>=14.0.0'} + dev: false + /keccak@3.0.4: resolution: {integrity: sha512-3vKuW0jV8J3XNTzvfyicFR5qvxrSAGl7KIhvgOu5cmWwM7tZRj3fMbj/pfIf4be7aznbc+prBWGjywox/g2Y6Q==} engines: {node: '>=10.0.0'} @@ -22620,7 +22671,6 @@ packages: /querystringify@2.2.0: resolution: {integrity: sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==} - dev: true /queue-microtask@1.2.3: resolution: {integrity: sha512-NuaNSa6flKT5JaSYQzJok04JzTL1CA6aGhv5rfLW3PgqA+M2ChpZQnAC8h8i4ZFkBS8X5RqkDBHA7r4hej3K9A==} @@ -23141,6 +23191,15 @@ packages: mute-stream: 0.0.8 dev: false + /readable-stream@1.1.14: + resolution: {integrity: sha512-+MeVjFf4L44XUkhM1eYbD8fyEsxcV81pqMSR5gblfcLCHfZvbrqy4/qYHE+/R5HoBUT11WV5O08Cr1n3YXkWVQ==} + dependencies: + core-util-is: 1.0.3 + inherits: 2.0.4 + isarray: 0.0.1 + string_decoder: 0.10.31 + dev: false + /readable-stream@2.3.3: resolution: {integrity: sha512-m+qzzcn7KUxEmd1gMbchF+Y2eIUbieUaxkWtptyHywrX0rE8QEYqPC07Vuy4Wm32/xE16NcdBctb8S0Xe/5IeQ==} dependencies: @@ -23378,7 +23437,6 @@ packages: /requires-port@1.0.0: resolution: {integrity: sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==} - dev: true /resize-observer-polyfill@1.5.1: resolution: {integrity: sha512-LwZrotdHOo12nQuZlHEmtuXdqGoOD0OhaxopaNFxWzInpEgaLWoVuAMbTzixuosCx2nEG58ngzW3vxdWoxIgdg==} @@ -24458,6 +24516,10 @@ packages: define-properties: 1.2.1 es-object-atoms: 1.0.0 + /string_decoder@0.10.31: + resolution: {integrity: sha512-ev2QzSzWPYmy9GuqfIVildA4OdcGLeFZQrq5ys6RtiuF+RQQiZWr8TZNyAcuVXyQRYfEO+MsoB/1BuQVhOJuoQ==} + dev: false + /string_decoder@1.0.3: resolution: {integrity: sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==} dependencies: @@ -25791,7 +25853,6 @@ packages: dependencies: querystringify: 2.2.0 requires-port: 1.0.0 - dev: true /url-to-options@1.0.1: resolution: {integrity: sha512-0kQLIzG4fdk/G5NONku64rSH/x32NOA39LVQqlK8Le6lvTF6GGRJpqaQFGgU+CLwySIqBSMdwYM0sYcW9f6P4A==}