Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Fix mainnet genesis block indexing #1963

Merged
merged 8 commits into from
Dec 2, 2023
2 changes: 1 addition & 1 deletion docker/nats-server.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
max_payload: 4Mb
max_payload: 8MB
http_port: 8222
10 changes: 4 additions & 6 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,14 @@ const invokeEndpoint = async (endpoint, params = {}, numRetries = NUM_REQUEST_RE
} while (retries--);
};

// Checks to ensure that the API Client is always alive
const resetApiClientListener = async () => instantiateClient(true).catch(() => {});
sameersubudhi marked this conversation as resolved.
Show resolved Hide resolved
Signals.get('resetApiClient').add(resetApiClientListener);

if (!config.isUseLiskIPCClient) {
// Check to ensure that the API Client is always alive
const triggerRegularClientLivelinessChecks = () =>
setInterval(async () => {
const isAlive = await checkIsClientAlive();
if (!isAlive) instantiateClient(true).catch(() => {});
}, CLIENT_ALIVE_ASSUMPTION_TIME);
}

Signals.get('genesisBlockDownloaded').add(triggerRegularClientLivelinessChecks);

module.exports = {
timeoutMessage,
Expand Down
33 changes: 0 additions & 33 deletions services/blockchain-connector/shared/sdk/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ const util = require('util');

const { Logger, Signals } = require('lisk-service-framework');

const config = require('../../config');

const { getApiClient } = require('./client');
const { formatEvent } = require('./formatter');
const { getRegisteredEvents, getEventsByHeight, getNodeInfo } = require('./endpoints');
Expand All @@ -40,26 +38,19 @@ const events = [
EVENT_TX_POOL_TRANSACTION_NEW,
];

let eventsCounter;

const logError = (method, err) => {
logger.warn(`Invocation for ${method} failed with error: ${err.message}.`);
logger.debug(err.stack);
};

const subscribeToAllRegisteredEvents = async () => {
// Reset eventsCounter first
eventsCounter = 0;

const apiClient = await getApiClient();
const registeredEvents = await getRegisteredEvents();
const allEvents = registeredEvents.concat(events);
allEvents.forEach(event => {
apiClient.subscribe(event, async payload => {
// Force update necessary caches on new chain events
if (event.startsWith('chain_')) {
eventsCounter++; // Increase counter with every newBlock/deleteBlock

await getNodeInfo(true).catch(err => logError('getNodeInfo', err));
await updateTokenInfo().catch(err => logError('updateTokenInfo', err));
}
Expand All @@ -77,30 +68,6 @@ const getEventsByHeightFormatted = async height => {
return formattedEvents;
};

// To ensure API Client is alive and receiving chain events
const ensureAPIClientLiveness = () =>
setInterval(() => {
if (typeof eventsCounter === 'number' && eventsCounter > 0) {
eventsCounter = 0;
} else {
if (typeof eventsCounter !== 'number') {
logger.warn(
`eventsCounter ended up with non-numeric value: ${JSON.stringify(
eventsCounter,
null,
'\t',
)}.`,
);
eventsCounter = 0;
}

Signals.get('resetApiClient').dispatch();
logger.info("Dispatched 'resetApiClient' signal to re-instantiate the API client.");
}
}, config.clientConnVerifyInterval);

Signals.get('nodeIsSynced').add(ensureAPIClientLiveness);

module.exports = {
events,

Expand Down
6 changes: 5 additions & 1 deletion services/blockchain-connector/shared/sdk/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* Removal or modification of this copyright notice is prohibited.
*
*/
const { Signals } = require('lisk-service-framework');

const config = require('../../config');

const {
Expand Down Expand Up @@ -128,7 +130,9 @@ const init = async () => {
await getPosConstants();

// Download the genesis block, if applicable
await getGenesisBlock();
await getGenesisBlock().then(() => {
Signals.get('genesisBlockDownloaded').dispatch();
});

if (config.appExitDelay) {
setTimeout(() => process.exit(0), config.appExitDelay);
Expand Down
4 changes: 3 additions & 1 deletion services/blockchain-indexer/jobs/dataService/validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ module.exports = [
await reloadValidatorCache();
logger.info('Successfully initialized validators cache.');
} catch (err) {
logger.warn(`Initializing validators cache failed due to: ${err.stack}`);
logger.warn(`Initializing validators cache failed due to: ${err.message}`);
logger.debug(err.stack);
}
}
},
Expand All @@ -43,6 +44,7 @@ module.exports = [
await reloadValidatorCache();
} catch (err) {
logger.warn(`Reloading validators cache failed due to: ${err.message}`);
logger.debug(err.stack);
}
}
},
Expand Down
9 changes: 7 additions & 2 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const { getTransactionExecutionStatus } = require('../utils/transactions');
const { getEventsInfoToIndex } = require('./utils/events');
const { calcCommissionAmount, calcSelfStakeReward } = require('./utils/validator');
const { indexAccountPublicKey } = require('./accountIndex');
const { indexGenesisBlockAssets } = require('./genesisBlock');
const { getGenesisAssetIntervalTimeout, indexGenesisBlockAssets } = require('./genesisBlock');
const { updateTotalLockedAmounts } = require('./utils/blockchainIndex');
const {
getAddressesFromTokenEvents,
Expand Down Expand Up @@ -135,8 +135,8 @@ const indexBlock = async job => {
let dbTrx;
let blockToIndexFromNode;

const genesisHeight = await getGenesisHeight();
try {
const genesisHeight = await getGenesisHeight();
const blocksTable = await getBlocksTable();

const [lastIndexedBlock = {}] = await blocksTable.find(
Expand Down Expand Up @@ -379,6 +379,11 @@ const indexBlock = async job => {
`Successfully indexed block ${blockToIndexFromNode.id} at height ${blockToIndexFromNode.height}.`,
);
} catch (error) {
// Stop genesisAsset index progress logging on errors
if (blockToIndexFromNode.height === genesisHeight) {
clearInterval(getGenesisAssetIntervalTimeout());
}

// Block may not have been initialized when error occurred
const failedBlockInfo = {
id: typeof blockToIndexFromNode === 'undefined' ? undefined : blockToIndexFromNode.id,
Expand Down
6 changes: 5 additions & 1 deletion services/blockchain-indexer/shared/indexer/genesisBlock.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@ const getStakesTable = () => getTableInstance(stakesTableSchema, MYSQL_ENDPOINT)
const getCommissionsTable = () => getTableInstance(commissionsTableSchema, MYSQL_ENDPOINT);

const allAccountsAddresses = [];
let intervalTimeout;
let isTokensBalanceIndexed = false;

const getGenesisAssetIntervalTimeout = () => intervalTimeout;

const indexTokenModuleAssets = async dbTrx => {
logger.info('Starting to index the genesis assets from the Token module.');
const genesisBlockAssetsLength = await requestConnector('getGenesisAssetsLength', {
Expand Down Expand Up @@ -171,7 +174,7 @@ const indexPosModuleAssets = async dbTrx => {

const indexGenesisBlockAssets = async dbTrx => {
logger.info('Starting to index the genesis assets.');
const intervalTimeout = setInterval(
intervalTimeout = setInterval(
() => logger.info('Genesis assets indexing still in progress...'),
5000,
);
Expand Down Expand Up @@ -204,6 +207,7 @@ const indexTokenBalancesListener = async () => {
Signals.get('chainNewBlock').add(indexTokenBalancesListener);

module.exports = {
getGenesisAssetIntervalTimeout,
indexGenesisBlockAssets,

// For testing
Expand Down