From 994e2c02c18942a45a52bec077b2b2dd08143ad3 Mon Sep 17 00:00:00 2001 From: EvgenKor Date: Tue, 3 Sep 2024 01:25:24 +0300 Subject: [PATCH] Storing local cache by commit (instead of indexes) --- zp-relayer/pool/RelayPool.ts | 23 ++++++++++------------- zp-relayer/services/relayer/endpoints.ts | 15 ++++----------- zp-relayer/state/TxStore.ts | 12 ++++++------ 3 files changed, 20 insertions(+), 30 deletions(-) diff --git a/zp-relayer/pool/RelayPool.ts b/zp-relayer/pool/RelayPool.ts index f3a3a52..0c9268b 100644 --- a/zp-relayer/pool/RelayPool.ts +++ b/zp-relayer/pool/RelayPool.ts @@ -262,7 +262,7 @@ export class RelayPool extends BasePool { } // cache transaction locally - await this.cacheTxLocally(commitIndex, outCommit, txHash, memo); + await this.cacheTxLocally(outCommit, txHash, memo); // start monitoring local cache against the indexer to cleanup already indexed txs this.startLocalCacheObserver(commitIndex); } @@ -278,7 +278,7 @@ export class RelayPool extends BasePool { poolJob.data.transaction.txHash = txHash; await poolJob.update(poolJob.data); - await this.cacheTxLocally(res.commitIndex, res.outCommit, txHash, res.memo); + await this.cacheTxLocally(res.outCommit, txHash, res.memo); } } } @@ -296,7 +296,7 @@ export class RelayPool extends BasePool { } } - protected async cacheTxLocally(index: number, commit: string, txHash: string, memo: string) { + protected async cacheTxLocally(commit: string, txHash: string, memo: string) { // store or updating local tx store // (we should keep sent transaction until the indexer grab them) const prefixedMemo = buildPrefixedMemo( @@ -304,8 +304,8 @@ export class RelayPool extends BasePool { txHash, memo ); - await this.txStore.add(index, prefixedMemo); - logger.info(`Tx @${index} with commit ${commit} has been CACHED locally`); + await this.txStore.add(commit, prefixedMemo); + logger.info(`Tx with commit ${commit} has been CACHED locally`); } private async getIndexerInfo() { @@ -350,9 +350,7 @@ export class RelayPool extends BasePool { const CACHE_OBSERVE_INTERVAL_MS = 1000; // waiting time between checks const EXTEND_LIMIT_TO_FETCH = 10; // taking into account non-atomic nature of /info and /transactions/v2 requests while (true) { - const localEntries = Object.entries(await this.txStore.getAll()) - .map(([i, v]) => [parseInt(i), v] as [number, string]) - .sort(([i1], [i2]) => i1 - i2) + const localEntries = Object.entries(await this.txStore.getAll()); let localEntriesCnt = localEntries.length; if (localEntries.length == 0) { @@ -367,11 +365,10 @@ export class RelayPool extends BasePool { const indexerCommitments = (await this.getIndexerTxs(fromIndex, limit)).map(tx => tx.slice(65, 129)); // find cached commitments in the indexer's response - for (const [index, memo] of localEntries) { - const commitLocal = memo.slice(0, 64) - if (indexerCommitments.includes(commitLocal)) { - logger.info('Deleting index from optimistic state', { index, commitLocal }) - await this.txStore.remove(index.toString()) + for (const [commit, memo] of localEntries) { + if (indexerCommitments.includes(commit)) { + logger.info('Deleting cached entry', { commit }) + await this.txStore.remove(commit) localEntriesCnt--; } } diff --git a/zp-relayer/services/relayer/endpoints.ts b/zp-relayer/services/relayer/endpoints.ts index 9463791..1850c0f 100644 --- a/zp-relayer/services/relayer/endpoints.ts +++ b/zp-relayer/services/relayer/endpoints.ts @@ -97,23 +97,16 @@ async function getTransactionsV2(req: Request, res: Response, { pool }: PoolInje } const indexerTxs: string[] = await response.json() - const lastIndex = offset + indexerTxs.length * OUTPLUSONE const txStore = (pool as RelayPool).txStore - const indices = await txStore.getAll().then(keys => { - return Object.entries(keys) - .map(([i, v]) => [parseInt(i), v] as [number, string]) - .filter(([i]) => offset <= i && i <= lastIndex) - .sort(([i1], [i2]) => i1 - i2) - }) + const localEntries = Object.entries(await txStore.getAll()); const indexerCommitments = indexerTxs.map(tx => tx.slice(65, 129)); const optimisticTxs: string[] = [] - for (const [index, memo] of indices) { - const commitLocal = memo.slice(0, 64) - if (indexerCommitments.includes(commitLocal)) { + for (const [commit, memo] of localEntries) { + if (indexerCommitments.includes(commit)) { // !!! we shouldn't modify local cache from here. Just filter entries to return correct response //logger.info('Deleting index from optimistic state', { index }) - //await txStore.remove(index.toString()) + //await txStore.remove(commit) } else { optimisticTxs.push(txToV2Format('0', memo)) } diff --git a/zp-relayer/state/TxStore.ts b/zp-relayer/state/TxStore.ts index 2b5f735..6fce219 100644 --- a/zp-relayer/state/TxStore.ts +++ b/zp-relayer/state/TxStore.ts @@ -3,16 +3,16 @@ import type { Redis } from 'ioredis' export class TxStore { constructor(public name: string, private redis: Redis) {} - async add(index: number, memo: string) { - await this.redis.hset(this.name, { [index]: memo }) + async add(commitment: string, memo: string) { + await this.redis.hset(this.name, { [commitment]: memo }) } - async remove(index: string) { - await this.redis.hdel(this.name, index) + async remove(commitment: string) { + await this.redis.hdel(this.name, commitment) } - async get(index: string) { - const memo = await this.redis.hget(this.name, index) + async get(commitment: string) { + const memo = await this.redis.hget(this.name, commitment) return memo }