Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
⚙️ Index blocks at a controlled pace
Browse files Browse the repository at this point in the history
  • Loading branch information
sameersubudhi committed Apr 8, 2024
1 parent 2e8cd1f commit 7192c1a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 18 deletions.
22 changes: 13 additions & 9 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,23 @@ const instantiateNewClient = async () => {
}
};

let isClientStatLogIntervalSet = false;
let isReInstantiateIntervalRunning = false;
const initClientPool = async poolSize => {
// Set the intervals only at application init
if (clientPool.length === 0) {
setInterval(() => {
const stats = getApiClientStats();
logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`);
if (stats.activePoolSize < stats.expectedPoolSize) {
logger.warn(
'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.',
);
}
}, 5 * 60 * 1000);
if (!isClientStatLogIntervalSet) {
isClientStatLogIntervalSet = true;
setInterval(() => {
const stats = getApiClientStats();
logger.info(`API client instantiation stats: ${JSON.stringify(stats)}`);
if (stats.activePoolSize < stats.expectedPoolSize) {
logger.warn(
'activePoolSize should catch up with the expectedPoolSize, once the node is under less stress.',
);
}
}, 5 * 60 * 1000);
}

// Re-instantiate interval: Replaces nulls in clientPool with new active apiClients
// isReInstantiateIntervalRunning is the safety check to skip callback execution if the previous one is already in-progress
Expand Down
18 changes: 12 additions & 6 deletions services/blockchain-indexer/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,18 @@ config.debug = process.env.SERVICE_LOG_LEVEL === 'debug';
* Message queue options
*/
config.queue = {
defaultJobOptions: {
attempts: 5,
timeout: 5 * 60 * 1000, // millisecs
removeOnComplete: true,
removeOnFail: true,
stackTraceLimit: 0,
defaultOptions: {
defaultJobOptions: {
attempts: 5,
timeout: 5 * 60 * 1000, // millisecs
removeOnComplete: true,
removeOnFail: true,
stackTraceLimit: 0,
},
limiter: {
max: 30,
duration: 10 * 1000, // in millisecs
},
},

// Inter-microservice message queues
Expand Down
2 changes: 2 additions & 0 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,15 @@ const initBlockProcessingQueues = async () => {
config.queue.indexBlocks.name,
indexBlock,
config.queue.indexBlocks.concurrency,
config.queue.defaultOptions,
);

deleteIndexedBlocksQueue = Queue(
config.endpoints.cache,
config.queue.deleteIndexedBlocks.name,
deleteIndexedBlocksWrapper,
config.queue.deleteIndexedBlocks.concurrency,
config.queue.defaultOptions,
);
};

Expand Down
6 changes: 3 additions & 3 deletions services/blockchain-indexer/shared/messageProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ const STATS_INTERVAL = 1 * 60 * 1000; // ms
const accountMessageQueue = new MessageQueue(
config.queue.account.name,
config.endpoints.messageQueue,
{ defaultJobOptions: config.queue.defaultJobOptions },
{ defaultJobOptions: config.queue.defaultOptions.defaultJobOptions },
);

// Missing blocks
const blockMessageQueue = new MessageQueue(config.queue.block.name, config.endpoints.messageQueue, {
defaultJobOptions: config.queue.defaultJobOptions,
defaultJobOptions: config.queue.defaultOptions.defaultJobOptions,
});

// Newly generated blocks
const eventMessageQueue = new MessageQueue(config.queue.event.name, config.endpoints.messageQueue, {
defaultJobOptions: config.queue.defaultJobOptions,
defaultJobOptions: config.queue.defaultOptions.defaultJobOptions,
});

const queueStatus = async messageQueue => {
Expand Down

0 comments on commit 7192c1a

Please sign in to comment.