Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
🐎 Improve indexing speed
Browse files Browse the repository at this point in the history
- Replace token_getBalances invocation with token_getBalance
  • Loading branch information
sameersubudhi committed Apr 8, 2024
1 parent 0e21e4c commit 6554bb2
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 19 deletions.
48 changes: 31 additions & 17 deletions services/blockchain-indexer/shared/indexer/accountBalanceIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ const {
Logger,
} = require('lisk-service-framework');

const regex = require('../regex');
const config = require('../../config');
const accountBalancesTableSchema = require('../database/schema/accountBalances');

const { MODULE } = require('../constants');
const { getTokenBalances } = require('../dataService');
const accountBalancesTableSchema = require('../database/schema/accountBalances');
const { getPosTokenID } = require('../dataService/business/pos/constants');

const logger = Logger();

Expand All @@ -34,9 +37,10 @@ const MYSQL_ENDPOINT = config.endpoints.mysql;

const getAccountBalancesTable = () => getTableInstance(accountBalancesTableSchema, MYSQL_ENDPOINT);

const updateAccountBalances = async address => {
const updateAccountBalances = async addressTokenID => {
const accountBalancesTable = await getAccountBalancesTable();
const { data: balanceInfos } = await getTokenBalances({ address });
const [address, cTokenID] = addressTokenID.split('_');
const { data: balanceInfos } = await getTokenBalances({ address, tokenID: cTokenID });

const updatedTokenBalances = balanceInfos.map(balanceInfo => {
const { tokenID, availableBalance, lockedBalances } = balanceInfo;
Expand All @@ -56,14 +60,20 @@ const updateAccountBalances = async address => {
await accountBalancesTable.upsert(updatedTokenBalances);
};

const scheduleAddressesBalanceUpdate = async addresses => {
if (addresses.length) {
redis.sadd(config.set.accountBalanceUpdate.name, addresses);
const scheduleAddressesBalanceUpdate = async addressTokens => {
if (addressTokens.length) {
addressTokens.forEach(addressToken => {
if (!regex.ADDRESS_LISK32_TOKEN_ID.test(addressToken)) {
throw new Error(`Invalid address tokenID format: ${addressToken}`);
}
});

redis.sadd(config.set.accountBalanceUpdate.name, addressTokens);
}
};

const getAddressesFromTokenEvents = events => {
const addressesToUpdate = [];
const getAddressesFromTokenEvents = async events => {
const addressTokensToUpdate = [];
const tokenModuleEvents = events.filter(event => event.module === MODULE.TOKEN);

// eslint-disable-next-line no-restricted-syntax
Expand All @@ -74,33 +84,37 @@ const getAddressesFromTokenEvents = events => {
for (const key of eventDataKeys) {
if (key.toLowerCase().includes('address')) {
const address = eventData[key];
addressesToUpdate.push(address);
const tokenID = eventData.tokenID || eventData.messageFeeTokenID || (await getPosTokenID());
addressTokensToUpdate.push(`${address}_${tokenID}`);
}
}
}

return addressesToUpdate;
return addressTokensToUpdate;
};

const triggerAccountsBalanceUpdate = async () => {
const addresses = await redis.spop(
const addressTokenEntries = await redis.spop(
config.set.accountBalanceUpdate.name,
config.set.accountBalanceUpdate.batchSize,
);

const numAddressesScheduled = addresses.length;
const numAddressesScheduled = addressTokenEntries.length;
try {
// eslint-disable-next-line no-restricted-syntax
while (addresses.length) {
const address = addresses.shift();
await updateAccountBalances(address);
while (addressTokenEntries.length) {
const addressToken = addressTokenEntries.shift();
await updateAccountBalances(addressToken).catch(err => {
addressTokenEntries.push(addressToken);
throw err;
});
}
logger.info(`Successfully updated account balances for ${numAddressesScheduled} account(s).`);
} catch (err) {
// Reschedule accounts balance update on error for remaining addresses
await scheduleAddressesBalanceUpdate(addresses);
await scheduleAddressesBalanceUpdate(addressTokenEntries);

const numPending = addresses.length;
const numPending = addressTokenEntries.length;
const numSuccess = numAddressesScheduled - numPending;
logger.info(
`Successfully updated account balances for ${numSuccess} account(s). Rescheduling updates for ${numPending} account(s).`,
Expand Down
4 changes: 2 additions & 2 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ const indexBlock = async job => {
await updateTotalLockedAmounts(tokenIDLockedAmountChangeMap, dbTrx);

// Get addresses to schedule account balance updates from token module events
addressesToUpdateBalance = getAddressesFromTokenEvents(events);
addressesToUpdateBalance = await getAddressesFromTokenEvents(events);
}

const blockToIndex = {
Expand Down Expand Up @@ -624,7 +624,7 @@ const deleteIndexedBlocks = async job => {
await updateTotalLockedAmounts(tokenIDLockedAmountChangeMap, dbTrx);

// Get addresses to schedule account balance updates from token module events
addressesToUpdateBalance = getAddressesFromTokenEvents(events);
addressesToUpdateBalance = await getAddressesFromTokenEvents(events);
}

// Invalidate cached events for this block. Must be done after processing all event related calculations
Expand Down
2 changes: 2 additions & 0 deletions services/blockchain-indexer/shared/regex.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const NAME = /^[a-z0-9!@$&_.]{1,20}$/;
const TOKEN_ID = /^\b[a-fA-F0-9]{16}\b$/;
const MAINCHAIN_ID = /^[a-fA-F0-9]{2}000000$/;
const CHAIN_ID = /^\b[a-fA-F0-9]{8}\b$/;
const ADDRESS_LISK32_TOKEN_ID = /^lsk[a-hjkm-z2-9]{38}_[a-fA-F0-9]{16}$/;

module.exports = {
ADDRESS_LISK32,
Expand All @@ -32,4 +33,5 @@ module.exports = {
TOKEN_ID,
MAINCHAIN_ID,
CHAIN_ID,
ADDRESS_LISK32_TOKEN_ID,
};

0 comments on commit 6554bb2

Please sign in to comment.