Skip to content

Commit

Permalink
fix: add block range as queue
Browse files Browse the repository at this point in the history
  • Loading branch information
pedronauck committed May 3, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent feddc26 commit ef4fccf
Showing 4 changed files with 69 additions and 60 deletions.
30 changes: 30 additions & 0 deletions packages/graphql/src/application/uc/AddBlockRange.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { BlockRepository } from '~/domain/Block/BlockRepository';
import {
type QueueData,
type QueueInputs,
QueueNames,
queue,
} from '~/infra/queue/Queue';

type Input = QueueInputs[QueueNames.ADD_BLOCK_RANGE];

export class AddBlockRange {
async execute({ data }: QueueData<Input>) {
const { blocks } = data;
const repo = new BlockRepository();
await repo.upsertMany(blocks);
await queue.push(QueueNames.SYNC_TRANSACTIONS, { blocks });
}
}

export const addBlockRange = async (input: QueueData<Input>) => {
try {
const { execute } = new AddBlockRange();
await execute(input);
} catch (error) {
console.error(error);
throw new Error('Sync transactions', {
cause: error,
});
}
};
18 changes: 6 additions & 12 deletions packages/graphql/src/application/uc/SyncBlocks.ts
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ import { assign, createActor, fromPromise, setup } from 'xstate';
import { env } from '~/config';
import { BlockRepository } from '~/domain/Block/BlockRepository';
import type { GQLBlock } from '~/graphql/generated/sdk';
import { db } from '~/infra/database/Db';
import {
type QueueData,
type QueueInputs,
@@ -88,18 +87,13 @@ class Syncer {
if (!env.get('IS_DEV_TEST')) {
const repo = new BlockRepository();
const { blocks, endCursor } = await repo.blocksFromNode(to - from, from);
await queue.push(QueueNames.ADD_BLOCK_RANGE, { blocks });
const hasBlocks = blocks.length > 0;
return db.connection().transaction(async (trx) => {
await repo.insertMany(blocks, trx);
await queue.push(QueueNames.SYNC_TRANSACTIONS, {
blocks: blocks.filter(Boolean),
});

return {
endCursor,
hasBlocks,
};
});

return {
endCursor,
hasBlocks,
};
}
return {
endCursor: to,
65 changes: 22 additions & 43 deletions packages/graphql/src/domain/Block/BlockRepository.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import c from 'chalk';
import { desc, eq } from 'drizzle-orm';
import { values } from 'lodash';
import { Paginator, type PaginatorParams } from '~/core/Paginator';
import { GraphQLSDK } from '~/graphql/GraphQLSDK';
import type { GQLBlock } from '~/graphql/generated/sdk';
import { type DbTransaction, db } from '~/infra/database/Db';
import { type DbConnection, type DbTransaction, db } from '~/infra/database/Db';
import {
type TransactionItem,
TransactionsTable,
@@ -13,19 +12,6 @@ import { BlockEntity } from './BlockEntity';
import { type BlockItem, BlocksTable } from './BlockModel';

export class BlockRepository {
async findByHash(blockHash: string) {
const first = await db.connection().query.BlocksTable.findFirst({
where: eq(BlocksTable.blockHash, blockHash),
with: {
transactions: true,
},
});

if (!first) return null;
const { transactions, ...block } = first;
return BlockEntity.create(block, transactions);
}

async findByHeight(height: number) {
const first = await db.connection().query.BlocksTable.findFirst({
where: eq(BlocksTable._id, height),
@@ -83,37 +69,16 @@ export class BlockRepository {
return BlockEntity.create(block, transactions);
}

async insertOne(block: GQLBlock) {
const found = await this.findByHash(block.id);
if (found) {
throw new Error(`Block ${block.id} already exists`);
}

const [item] = await db
.connection()
.insert(BlocksTable)
.values(BlockEntity.toDBItem(block))
.returning();

return BlockEntity.create(item, []);
async upsertOne(block: GQLBlock) {
const upsertOne = this.createUpsertOne(db.connection());
return upsertOne(block);
}

async insertMany(blocks: GQLBlock[], trx: DbTransaction) {
const queries = blocks.map(async (block) => {
const found = await this.findByHash(block.id);
if (found) {
console.log(c.red(`Block ${block.header.height} already exists`));
return null;
}

const [item] = await trx
.insert(BlocksTable)
.values(BlockEntity.toDBItem(block))
.returning();

return BlockEntity.create(item, []);
async upsertMany(blocks: GQLBlock[]) {
return db.connection().transaction(async (trx) => {
const queries = blocks.map(this.createUpsertOne(trx));
return Promise.all(queries.filter(Boolean));
});
return Promise.all(queries.filter(Boolean));
}

async blocksFromNode(first: number, after?: number) {
@@ -140,4 +105,18 @@ export class BlockRepository {
const { data } = await sdk.blocks({ last: 1 });
return data.blocks.nodes[0] as GQLBlock;
}

private createUpsertOne(conn: DbConnection | DbTransaction) {
return async (block: GQLBlock) => {
const [item] = await conn
.insert(BlocksTable)
.values(BlockEntity.toDBItem(block))
.onConflictDoUpdate({
target: [BlocksTable._id],
set: { data: block },
})
.returning();
return BlockEntity.create(item, []);
};
}
}
16 changes: 11 additions & 5 deletions packages/graphql/src/infra/queue/Queue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import PgBoss, { type Job } from 'pg-boss';
import { addBlockRange } from '~/application/uc/AddBlockRange';
import { syncBlocks } from '~/application/uc/SyncBlocks';
import { syncLastBlocks } from '~/application/uc/SyncLastBlocks';
import { syncMissingBlocks } from '~/application/uc/SyncMissingBlocks';
@@ -13,10 +14,11 @@ const DB_PASS = env.get('DB_PASS');
const DB_NAME = env.get('DB_NAME');

export enum QueueNames {
SYNC_BLOCKS = 'indexer/sync:blocks',
SYNC_MISSING = 'indexer/sync:missing',
SYNC_TRANSACTIONS = 'indexer/sync:transactions',
SYNC_LAST = 'indexer/sync:last',
SYNC_BLOCKS = 'indexer/sync-blocks',
ADD_BLOCK_RANGE = 'indexer/add-block-range',
SYNC_MISSING = 'indexer/sync-missing',
SYNC_TRANSACTIONS = 'indexer/sync-transactions',
SYNC_LAST = 'indexer/sync-last',
}

export type QueueInputs = {
@@ -26,6 +28,9 @@ export type QueueInputs = {
offset?: number;
watch?: boolean;
};
[QueueNames.ADD_BLOCK_RANGE]: {
blocks: GQLBlock[];
};
[QueueNames.SYNC_TRANSACTIONS]: {
blocks: GQLBlock[];
};
@@ -79,7 +84,8 @@ export class Queue extends PgBoss {
async setupWorkers() {
const opts = this.workOpts;
await this.start();
await this.work(QueueNames.SYNC_BLOCKS, opts, syncBlocks);
await this.work(QueueNames.ADD_BLOCK_RANGE, opts, syncBlocks);
await this.work(QueueNames.SYNC_BLOCKS, opts, addBlockRange);
await this.work(QueueNames.SYNC_MISSING, opts, syncMissingBlocks);
await this.work(QueueNames.SYNC_TRANSACTIONS, opts, syncTransactions);
await this.work(QueueNames.SYNC_LAST, opts, syncLastBlocks);

0 comments on commit ef4fccf

Please sign in to comment.