Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Redis OOM errors #1927

Merged
merged 6 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ const checkIsClientAlive = () =>
clientCache && clientCache._channel && clientCache._channel.isAlive;

// eslint-disable-next-line consistent-return
const instantiateClient = async (isForceUpdate = false) => {
const instantiateClient = async (isForceReInstantiate = false) => {
try {
if (!isInstantiating || isForceUpdate) {
if (!checkIsClientAlive() || isForceUpdate) {
if (!isInstantiating || isForceReInstantiate) {
if (!checkIsClientAlive() || isForceReInstantiate) {
isInstantiating = true;
instantiationBeginTime = Date.now();
if (clientCache) await clientCache.disconnect();
Expand All @@ -54,7 +54,7 @@ const instantiateClient = async (isForceUpdate = false) => {
? await createIPCClient(config.liskAppDataPath)
: await createWSClient(`${liskAddress}/rpc-ws`);

if (isForceUpdate) logger.info('Re-instantiated the API client forcefully.');
if (isForceReInstantiate) logger.info('Re-instantiated the API client forcefully.');

// Inform listeners about the newly instantiated ApiClient
Signals.get('newApiClient').dispatch();
Expand All @@ -78,10 +78,11 @@ const instantiateClient = async (isForceUpdate = false) => {

logger.error(errMessage);
logger.error(err.message);
if (err.code === 'ECONNREFUSED')
if (err.message.includes('ECONNREFUSED')) {
throw new Error('ECONNREFUSED: Unable to reach a network node.');
}

return clientCache;
return null;
}
};

Expand All @@ -92,10 +93,10 @@ const getApiClient = async () => {

// eslint-disable-next-line consistent-return
const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RETRIES) => {
const apiClient = await getApiClient();
let retries = numRetries;
do {
try {
const apiClient = await getApiClient();
const response = await apiClient._channel.invoke(endpoint, params);
return response;
} catch (err) {
Expand All @@ -108,7 +109,7 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE
} while (retries--);
};

const resetApiClientListener = () => instantiateClient(true);
const resetApiClientListener = async () => instantiateClient(true);
Signals.get('resetApiClient').add(resetApiClientListener);

module.exports = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ const EVENT_TOPIC_MAPPINGS_BY_MODULE = {
[MODULE_NAME_FEE]: {
[EVENT_NAME_FEE_PROCESSED]: ['transactionID', 'senderAddress', 'generatorAddress'],
[EVENT_NAME_INSUFFICIENT_FEE]: ['transactionID'],
[EVENT_NAME_RELAYER_FEE_PROCESSED]: ['transactionID', 'ccmID', 'relayerAddress'],
[EVENT_NAME_RELAYER_FEE_PROCESSED]: ['ccmID', 'relayerAddress'],
},
[MODULE_NAME_INTEROPERABILITY]: {
[EVENT_NAME_INVALID_CERTIFICATE_SIGNATURE]: ['transactionID', 'chainID'],
Expand Down
8 changes: 5 additions & 3 deletions services/blockchain-coordinator/shared/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ const scheduleMissingBlocksIndexing = async () => {
// Skip job scheduling when the jobCount is greater than the threshold
const jobCount = await getLiveIndexingJobCount();
if (jobCount > config.job.indexMissingBlocks.skipThreshold) {
logger.info(`Skipping missing blocks job run. ${jobCount} indexing jobs already in the queue.`);
logger.info(
`Skipping missing blocks job run. ${jobCount} indexing jobs (current threshold: ${config.job.indexMissingBlocks.skipThreshold}) already in the queue.`,
);
return;
}

Expand Down Expand Up @@ -263,9 +265,9 @@ const scheduleMissingBlocksIndexing = async () => {

if (result.length === 0) {
const lastIndexVerifiedHeight = await getIndexVerifiedHeight();
if (batchEndHeight === lastIndexVerifiedHeight + MAX_QUERY_RANGE) {
sameersubudhi marked this conversation as resolved.
Show resolved Hide resolved
if (batchEndHeight <= lastIndexVerifiedHeight + MAX_QUERY_RANGE) {
await setIndexVerifiedHeight(batchEndHeight);
logger.debug(
logger.info(
`No missing blocks found in range ${batchStartHeight} - ${batchEndHeight}. Setting index verified height to ${batchEndHeight}.`,
);
}
Expand Down
39 changes: 29 additions & 10 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,6 @@ const indexBlock = async job => {
await scheduleAddressesBalanceUpdate(addressesToUpdateBalance);
logger.info(`Successfully indexed block ${block.id} at height ${block.height}.`);
} catch (error) {
// Reschedule the block for indexing
// eslint-disable-next-line no-use-before-define
await addHeightToIndexBlocksQueue(blockHeightToIndex);

// Block may not have been initialized when error occurred
const failedBlockInfo = {
id: typeof block === 'undefined' ? undefined : block.id,
Expand Down Expand Up @@ -479,6 +475,22 @@ const deleteIndexedBlocks = async job => {
meta: { offset: 0, count: forkedTransactions.length, total: forkedTransactions.length },
});

// Update generatedBlocks count for the block generator
const validatorsTable = await getValidatorsTable();
logger.trace(
`Decreasing generatedBlocks for validator ${blockFromJob.generatorAddress} by 1.`,
);
await validatorsTable.decrement(
{
decrement: { generatedBlocks: 1 },
where: { address: blockFromJob.generatorAddress },
},
dbTrx,
);
logger.debug(
`Decreased generatedBlocks for validator ${blockFromJob.generatorAddress} by 1.`,
);

// Calculate locked amount change from events and update in key_value_store table
if (events.length) {
const eventsTable = await getEventsTable();
Expand Down Expand Up @@ -514,7 +526,6 @@ const deleteIndexedBlocks = async job => {
commissionAmount,
);

const validatorsTable = await getValidatorsTable();
logger.trace(
`Decreasing commission for validator ${blockFromJob.generatorAddress} by ${commissionAmount}.`,
);
Expand Down Expand Up @@ -646,9 +657,17 @@ const deleteIndexedBlocksQueue = Queue(
);

const getLiveIndexingJobCount = async () => {
const { queue: bullQueue } = indexBlocksQueue;
const jobCount = await bullQueue.getJobCounts();
const count = jobCount.active + jobCount.waiting;
const { queue: indexBlocksBullQueue } = indexBlocksQueue;
const { queue: deleteIndexedBlocksBullQueue } = deleteIndexedBlocksQueue;

const jcIndexBlocksQueue = await indexBlocksBullQueue.getJobCounts();
const jcDeleteIndexedBlocksQueue = await deleteIndexedBlocksBullQueue.getJobCounts();

const count =
jcIndexBlocksQueue.active +
jcIndexBlocksQueue.waiting +
jcDeleteIndexedBlocksQueue.active +
jcDeleteIndexedBlocksQueue.waiting;
return count;
};

Expand All @@ -659,8 +678,8 @@ const getPendingDeleteJobCount = async () => {
};

const scheduleBlockDeletion = async block => {
block = Array.isArray(block) ? block : [block];
deleteIndexedBlocksQueue.add({ blocks: [...block] });
const blocks = Array.isArray(block) ? block : [block];
await deleteIndexedBlocksQueue.add({ blocks });
};

const indexNewBlock = async block => {
Expand Down