Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1927 from LiskHQ/1919-redis-oom-errors
Browse files Browse the repository at this point in the history
Redis OOM errors
  • Loading branch information
priojeetpriyom authored Nov 16, 2023
2 parents 7879da7 + a86ece0 commit bc47bcc
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 22 deletions.
17 changes: 9 additions & 8 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ const checkIsClientAlive = () =>
clientCache && clientCache._channel && clientCache._channel.isAlive;

// eslint-disable-next-line consistent-return
const instantiateClient = async (isForceUpdate = false) => {
const instantiateClient = async (isForceReInstantiate = false) => {
try {
if (!isInstantiating || isForceUpdate) {
if (!checkIsClientAlive() || isForceUpdate) {
if (!isInstantiating || isForceReInstantiate) {
if (!checkIsClientAlive() || isForceReInstantiate) {
isInstantiating = true;
instantiationBeginTime = Date.now();
if (clientCache) await clientCache.disconnect();
Expand All @@ -54,7 +54,7 @@ const instantiateClient = async (isForceUpdate = false) => {
? await createIPCClient(config.liskAppDataPath)
: await createWSClient(`${liskAddress}/rpc-ws`);

if (isForceUpdate) logger.info('Re-instantiated the API client forcefully.');
if (isForceReInstantiate) logger.info('Re-instantiated the API client forcefully.');

// Inform listeners about the newly instantiated ApiClient
Signals.get('newApiClient').dispatch();
Expand All @@ -78,10 +78,11 @@ const instantiateClient = async (isForceUpdate = false) => {

logger.error(errMessage);
logger.error(err.message);
if (err.code === 'ECONNREFUSED')
if (err.message.includes('ECONNREFUSED')) {
throw new Error('ECONNREFUSED: Unable to reach a network node.');
}

return clientCache;
return null;
}
};

Expand All @@ -92,10 +93,10 @@ const getApiClient = async () => {

// eslint-disable-next-line consistent-return
const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RETRIES) => {
const apiClient = await getApiClient();
let retries = numRetries;
do {
try {
const apiClient = await getApiClient();
const response = await apiClient._channel.invoke(endpoint, params);
return response;
} catch (err) {
Expand All @@ -108,7 +109,7 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE
} while (retries--);
};

const resetApiClientListener = () => instantiateClient(true);
const resetApiClientListener = async () => instantiateClient(true);
Signals.get('resetApiClient').add(resetApiClientListener);

module.exports = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ const EVENT_TOPIC_MAPPINGS_BY_MODULE = {
[MODULE_NAME_FEE]: {
[EVENT_NAME_FEE_PROCESSED]: ['transactionID', 'senderAddress', 'generatorAddress'],
[EVENT_NAME_INSUFFICIENT_FEE]: ['transactionID'],
[EVENT_NAME_RELAYER_FEE_PROCESSED]: ['transactionID', 'ccmID', 'relayerAddress'],
[EVENT_NAME_RELAYER_FEE_PROCESSED]: ['ccmID', 'relayerAddress'],
},
[MODULE_NAME_INTEROPERABILITY]: {
[EVENT_NAME_INVALID_CERTIFICATE_SIGNATURE]: ['transactionID', 'chainID'],
Expand Down
8 changes: 5 additions & 3 deletions services/blockchain-coordinator/shared/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ const scheduleMissingBlocksIndexing = async () => {
// Skip job scheduling when the jobCount is greater than the threshold
const jobCount = await getLiveIndexingJobCount();
if (jobCount > config.job.indexMissingBlocks.skipThreshold) {
logger.info(`Skipping missing blocks job run. ${jobCount} indexing jobs already in the queue.`);
logger.info(
`Skipping missing blocks job run. ${jobCount} indexing jobs (current threshold: ${config.job.indexMissingBlocks.skipThreshold}) already in the queue.`,
);
return;
}

Expand Down Expand Up @@ -263,9 +265,9 @@ const scheduleMissingBlocksIndexing = async () => {

if (result.length === 0) {
const lastIndexVerifiedHeight = await getIndexVerifiedHeight();
if (batchEndHeight === lastIndexVerifiedHeight + MAX_QUERY_RANGE) {
if (batchEndHeight <= lastIndexVerifiedHeight + MAX_QUERY_RANGE) {
await setIndexVerifiedHeight(batchEndHeight);
logger.debug(
logger.info(
`No missing blocks found in range ${batchStartHeight} - ${batchEndHeight}. Setting index verified height to ${batchEndHeight}.`,
);
}
Expand Down
39 changes: 29 additions & 10 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ const indexBlock = async job => {
await scheduleAddressesBalanceUpdate(addressesToUpdateBalance);
logger.info(`Successfully indexed block ${block.id} at height ${block.height}.`);
} catch (error) {
// Reschedule the block for indexing
// eslint-disable-next-line no-use-before-define
await addHeightToIndexBlocksQueue(blockHeightToIndex);

// Block may not have been initialized when error occurred
const failedBlockInfo = {
id: typeof block === 'undefined' ? undefined : block.id,
Expand Down Expand Up @@ -479,6 +475,22 @@ const deleteIndexedBlocks = async job => {
meta: { offset: 0, count: forkedTransactions.length, total: forkedTransactions.length },
});

// Update generatedBlocks count for the block generator
const validatorsTable = await getValidatorsTable();
logger.trace(
`Decreasing generatedBlocks for validator ${blockFromJob.generatorAddress} by 1.`,
);
await validatorsTable.decrement(
{
decrement: { generatedBlocks: 1 },
where: { address: blockFromJob.generatorAddress },
},
dbTrx,
);
logger.debug(
`Decreased generatedBlocks for validator ${blockFromJob.generatorAddress} by 1.`,
);

// Calculate locked amount change from events and update in key_value_store table
if (events.length) {
const eventsTable = await getEventsTable();
Expand Down Expand Up @@ -514,7 +526,6 @@ const deleteIndexedBlocks = async job => {
commissionAmount,
);

const validatorsTable = await getValidatorsTable();
logger.trace(
`Decreasing commission for validator ${blockFromJob.generatorAddress} by ${commissionAmount}.`,
);
Expand Down Expand Up @@ -646,9 +657,17 @@ const deleteIndexedBlocksQueue = Queue(
);

const getLiveIndexingJobCount = async () => {
const { queue: bullQueue } = indexBlocksQueue;
const jobCount = await bullQueue.getJobCounts();
const count = jobCount.active + jobCount.waiting;
const { queue: indexBlocksBullQueue } = indexBlocksQueue;
const { queue: deleteIndexedBlocksBullQueue } = deleteIndexedBlocksQueue;

const jcIndexBlocksQueue = await indexBlocksBullQueue.getJobCounts();
const jcDeleteIndexedBlocksQueue = await deleteIndexedBlocksBullQueue.getJobCounts();

const count =
jcIndexBlocksQueue.active +
jcIndexBlocksQueue.waiting +
jcDeleteIndexedBlocksQueue.active +
jcDeleteIndexedBlocksQueue.waiting;
return count;
};

Expand All @@ -659,8 +678,8 @@ const getPendingDeleteJobCount = async () => {
};

const scheduleBlockDeletion = async block => {
block = Array.isArray(block) ? block : [block];
deleteIndexedBlocksQueue.add({ blocks: [...block] });
const blocks = Array.isArray(block) ? block : [block];
await deleteIndexedBlocksQueue.add({ blocks });
};

const indexNewBlock = async block => {
Expand Down

0 comments on commit bc47bcc

Please sign in to comment.