Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Redis OOM errors #1927

Merged
merged 6 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ const checkIsClientAlive = () =>
clientCache && clientCache._channel && clientCache._channel.isAlive;

// eslint-disable-next-line consistent-return
const instantiateClient = async (isForceUpdate = false) => {
const instantiateClient = async (isForceReInstantiate = false) => {
try {
if (!isInstantiating || isForceUpdate) {
if (!checkIsClientAlive() || isForceUpdate) {
if (!isInstantiating || isForceReInstantiate) {
if (!checkIsClientAlive() || isForceReInstantiate) {
isInstantiating = true;
instantiationBeginTime = Date.now();
if (clientCache) await clientCache.disconnect();
Expand All @@ -54,7 +54,7 @@ const instantiateClient = async (isForceUpdate = false) => {
? await createIPCClient(config.liskAppDataPath)
: await createWSClient(`${liskAddress}/rpc-ws`);

if (isForceUpdate) logger.info('Re-instantiated the API client forcefully.');
if (isForceReInstantiate) logger.info('Re-instantiated the API client forcefully.');

// Inform listeners about the newly instantiated ApiClient
Signals.get('newApiClient').dispatch();
Expand All @@ -78,10 +78,11 @@ const instantiateClient = async (isForceUpdate = false) => {

logger.error(errMessage);
logger.error(err.message);
if (err.code === 'ECONNREFUSED')
if (err.message.includes('ECONNREFUSED')) {
throw new Error('ECONNREFUSED: Unable to reach a network node.');
}

return clientCache;
return null;
}
};

Expand All @@ -92,10 +93,10 @@ const getApiClient = async () => {

// eslint-disable-next-line consistent-return
const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RETRIES) => {
const apiClient = await getApiClient();
let retries = numRetries;
do {
try {
const apiClient = await getApiClient();
const response = await apiClient._channel.invoke(endpoint, params);
return response;
} catch (err) {
Expand All @@ -108,7 +109,7 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE
} while (retries--);
};

const resetApiClientListener = () => instantiateClient(true);
const resetApiClientListener = async () => instantiateClient(true);
Signals.get('resetApiClient').add(resetApiClientListener);

module.exports = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ const EVENT_TOPIC_MAPPINGS_BY_MODULE = {
[MODULE_NAME_FEE]: {
[EVENT_NAME_FEE_PROCESSED]: ['transactionID', 'senderAddress', 'generatorAddress'],
[EVENT_NAME_INSUFFICIENT_FEE]: ['transactionID'],
[EVENT_NAME_RELAYER_FEE_PROCESSED]: ['transactionID', 'ccmID', 'relayerAddress'],
[EVENT_NAME_RELAYER_FEE_PROCESSED]: ['ccmID', 'relayerAddress'],
},
[MODULE_NAME_INTEROPERABILITY]: {
[EVENT_NAME_INVALID_CERTIFICATE_SIGNATURE]: ['transactionID', 'chainID'],
Expand Down
8 changes: 5 additions & 3 deletions services/blockchain-coordinator/shared/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ const scheduleMissingBlocksIndexing = async () => {
// Skip job scheduling when the jobCount is greater than the threshold
const jobCount = await getLiveIndexingJobCount();
if (jobCount > config.job.indexMissingBlocks.skipThreshold) {
logger.info(`Skipping missing blocks job run. ${jobCount} indexing jobs already in the queue.`);
logger.info(
`Skipping missing blocks job run. ${jobCount} indexing jobs (current threshold: ${config.job.indexMissingBlocks.skipThreshold}) already in the queue.`,
);
return;
}

Expand Down Expand Up @@ -263,9 +265,9 @@ const scheduleMissingBlocksIndexing = async () => {

if (result.length === 0) {
const lastIndexVerifiedHeight = await getIndexVerifiedHeight();
if (batchEndHeight === lastIndexVerifiedHeight + MAX_QUERY_RANGE) {
sameersubudhi marked this conversation as resolved.
Show resolved Hide resolved
if (batchEndHeight <= lastIndexVerifiedHeight + MAX_QUERY_RANGE) {
await setIndexVerifiedHeight(batchEndHeight);
logger.debug(
logger.info(
`No missing blocks found in range ${batchStartHeight} - ${batchEndHeight}. Setting index verified height to ${batchEndHeight}.`,
);
}
Expand Down
18 changes: 13 additions & 5 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,17 @@ const deleteIndexedBlocksQueue = Queue(
);

const getLiveIndexingJobCount = async () => {
const { queue: bullQueue } = indexBlocksQueue;
const jobCount = await bullQueue.getJobCounts();
const count = jobCount.active + jobCount.waiting;
const { queue: indexBlocksBullQueue } = indexBlocksQueue;
const { queue: deleteIndexedBlocksBullQueue } = deleteIndexedBlocksQueue;

const jcIndexBlocksQueue = await indexBlocksBullQueue.getJobCounts();
const jcDeleteIndexedBlocksQueue = await deleteIndexedBlocksBullQueue.getJobCounts();

const count =
jcIndexBlocksQueue.active +
jcIndexBlocksQueue.waiting +
jcDeleteIndexedBlocksQueue.active +
jcDeleteIndexedBlocksQueue.waiting;
return count;
};

Expand All @@ -659,8 +667,8 @@ const getPendingDeleteJobCount = async () => {
};

const scheduleBlockDeletion = async block => {
block = Array.isArray(block) ? block : [block];
deleteIndexedBlocksQueue.add({ blocks: [...block] });
const blocks = Array.isArray(block) ? block : [block];
await deleteIndexedBlocksQueue.add({ blocks });
};

const indexNewBlock = async block => {
Expand Down