From 7192c1a0697cf480012cac22a648b1e844b94ea4 Mon Sep 17 00:00:00 2001 From: Sameer Kumar Subudhi Date: Mon, 8 Apr 2024 12:09:33 +0200 Subject: [PATCH] :gear: Index blocks at a controlled pace --- .../blockchain-connector/shared/sdk/client.js | 22 +++++++++++-------- services/blockchain-indexer/config.js | 18 ++++++++++----- .../shared/indexer/blockchainIndex.js | 2 ++ .../shared/messageProcessor.js | 6 ++--- 4 files changed, 30 insertions(+), 18 deletions(-) diff --git a/services/blockchain-connector/shared/sdk/client.js b/services/blockchain-connector/shared/sdk/client.js index 045d18d240..a5f6bc320d 100644 --- a/services/blockchain-connector/shared/sdk/client.js +++ b/services/blockchain-connector/shared/sdk/client.js @@ -120,19 +120,23 @@ const instantiateNewClient = async () => { } }; +let isClientStatLogIntervalSet = false; let isReInstantiateIntervalRunning = false; const initClientPool = async poolSize => { // Set the intervals only at application init if (clientPool.length === 0) { - setInterval(() => { - const stats = getApiClientStats(); - logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`); - if (stats.activePoolSize < stats.expectedPoolSize) { - logger.warn( - 'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.', - ); - } - }, 5 * 60 * 1000); + if (!isClientStatLogIntervalSet) { + isClientStatLogIntervalSet = true; + setInterval(() => { + const stats = getApiClientStats(); + logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`); + if (stats.activePoolSize < stats.expectedPoolSize) { + logger.warn( + 'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.', + ); + } + }, 5 * 60 * 1000); + } // Re-instantiate interval: Replaces nulls in clientPool with new active apiClients // isReInstantiateIntervalRunning is the safety check to skip callback execution if the previous one is already in-progress diff --git a/services/blockchain-indexer/config.js b/services/blockchain-indexer/config.js index fc57b117fd..dac3db7b61 100644 --- a/services/blockchain-indexer/config.js +++ b/services/blockchain-indexer/config.js @@ -69,12 +69,18 @@ config.debug = process.env.SERVICE_LOG_LEVEL === 'debug'; * Message queue options */ config.queue = { - defaultJobOptions: { - attempts: 5, - timeout: 5 * 60 * 1000, // millisecs - removeOnComplete: true, - removeOnFail: true, - stackTraceLimit: 0, + defaultOptions: { + defaultJobOptions: { + attempts: 5, + timeout: 5 * 60 * 1000, // millisecs + removeOnComplete: true, + removeOnFail: true, + stackTraceLimit: 0, + }, + limiter: { + max: 30, + duration: 10 * 1000, // in millisecs + }, }, // Inter-microservice message queues diff --git a/services/blockchain-indexer/shared/indexer/blockchainIndex.js b/services/blockchain-indexer/shared/indexer/blockchainIndex.js index cf6f430f16..b57471809f 100644 --- a/services/blockchain-indexer/shared/indexer/blockchainIndex.js +++ b/services/blockchain-indexer/shared/indexer/blockchainIndex.js @@ -699,6 +699,7 @@ const initBlockProcessingQueues = async () => { config.queue.indexBlocks.name, indexBlock, config.queue.indexBlocks.concurrency, + config.queue.defaultOptions, ); deleteIndexedBlocksQueue = Queue( @@ -706,6 +707,7 @@ const initBlockProcessingQueues = async () => { config.queue.deleteIndexedBlocks.name, deleteIndexedBlocksWrapper, config.queue.deleteIndexedBlocks.concurrency, + config.queue.defaultOptions, ); }; diff --git a/services/blockchain-indexer/shared/messageProcessor.js b/services/blockchain-indexer/shared/messageProcessor.js index e37004c242..688ab12bfc 100644 --- a/services/blockchain-indexer/shared/messageProcessor.js +++ b/services/blockchain-indexer/shared/messageProcessor.js @@ -45,17 +45,17 @@ const STATS_INTERVAL = 1 * 60 * 1000; // ms const accountMessageQueue = new MessageQueue( config.queue.account.name, config.endpoints.messageQueue, - { defaultJobOptions: config.queue.defaultJobOptions }, + { defaultJobOptions: config.queue.defaultOptions.defaultJobOptions }, ); // Missing blocks const blockMessageQueue = new MessageQueue(config.queue.block.name, config.endpoints.messageQueue, { - defaultJobOptions: config.queue.defaultJobOptions, + defaultJobOptions: config.queue.defaultOptions.defaultJobOptions, }); // Newly generated blocks const eventMessageQueue = new MessageQueue(config.queue.event.name, config.endpoints.messageQueue, { - defaultJobOptions: config.queue.defaultJobOptions, + defaultJobOptions: config.queue.defaultOptions.defaultJobOptions, }); const queueStatus = async messageQueue => {