Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
🐎 Improve indexing speed
Browse files Browse the repository at this point in the history
- Replace token_getBalances invocation with token_getBalance
  • Loading branch information
sameersubudhi committed Apr 9, 2024
1 parent 0e21e4c commit 32f4649
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
45 changes: 28 additions & 17 deletions services/blockchain-indexer/shared/indexer/accountBalanceIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const {
} = require('lisk-service-framework');

const config = require('../../config');
const accountBalancesTableSchema = require('../database/schema/accountBalances');

const { MODULE } = require('../constants');
const { getTokenBalances } = require('../dataService');
const accountBalancesTableSchema = require('../database/schema/accountBalances');

const logger = Logger();

Expand All @@ -34,9 +35,11 @@ const MYSQL_ENDPOINT = config.endpoints.mysql;

const getAccountBalancesTable = () => getTableInstance(accountBalancesTableSchema, MYSQL_ENDPOINT);

const updateAccountBalances = async address => {
const updateAccountBalances = async addressTokenID => {
const accountBalancesTable = await getAccountBalancesTable();
const { data: balanceInfos } = await getTokenBalances({ address });
const [address, cTokenID] = addressTokenID.split('_');
const params = cTokenID ? { address, tokenID: cTokenID } : { address };
const { data: balanceInfos } = await getTokenBalances(params);

const updatedTokenBalances = balanceInfos.map(balanceInfo => {
const { tokenID, availableBalance, lockedBalances } = balanceInfo;
Expand All @@ -56,14 +59,14 @@ const updateAccountBalances = async address => {
await accountBalancesTable.upsert(updatedTokenBalances);
};

const scheduleAddressesBalanceUpdate = async addresses => {
if (addresses.length) {
redis.sadd(config.set.accountBalanceUpdate.name, addresses);
const scheduleAddressesBalanceUpdate = async addressTokens => {
if (addressTokens.length) {
redis.sadd(config.set.accountBalanceUpdate.name, addressTokens);
}
};

const getAddressesFromTokenEvents = events => {
const addressesToUpdate = [];
const getAddressesFromTokenEvents = async events => {
const addressTokensToUpdate = [];
const tokenModuleEvents = events.filter(event => event.module === MODULE.TOKEN);

// eslint-disable-next-line no-restricted-syntax
Expand All @@ -74,33 +77,41 @@ const getAddressesFromTokenEvents = events => {
for (const key of eventDataKeys) {
if (key.toLowerCase().includes('address')) {
const address = eventData[key];
addressesToUpdate.push(address);
const tokenID = eventData.tokenID || eventData.messageFeeTokenID;
if (tokenID) {
addressTokensToUpdate.push(`${address}_${tokenID}`);
} else {
addressTokensToUpdate.push(address);
}
}
}
}

return addressesToUpdate;
return addressTokensToUpdate;
};

const triggerAccountsBalanceUpdate = async () => {
const addresses = await redis.spop(
const addressTokenEntries = await redis.spop(
config.set.accountBalanceUpdate.name,
config.set.accountBalanceUpdate.batchSize,
);

const numAddressesScheduled = addresses.length;
const numAddressesScheduled = addressTokenEntries.length;
try {
// eslint-disable-next-line no-restricted-syntax
while (addresses.length) {
const address = addresses.shift();
await updateAccountBalances(address);
while (addressTokenEntries.length) {
const addressToken = addressTokenEntries.shift();
await updateAccountBalances(addressToken).catch(err => {
addressTokenEntries.push(addressToken);
throw err;
});
}
logger.info(`Successfully updated account balances for ${numAddressesScheduled} account(s).`);
} catch (err) {
// Reschedule accounts balance update on error for remaining addresses
await scheduleAddressesBalanceUpdate(addresses);
await scheduleAddressesBalanceUpdate(addressTokenEntries);

const numPending = addresses.length;
const numPending = addressTokenEntries.length;
const numSuccess = numAddressesScheduled - numPending;
logger.info(
`Successfully updated account balances for ${numSuccess} account(s). Rescheduling updates for ${numPending} account(s).`,
Expand Down
4 changes: 2 additions & 2 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ const indexBlock = async job => {
await updateTotalLockedAmounts(tokenIDLockedAmountChangeMap, dbTrx);

// Get addresses to schedule account balance updates from token module events
addressesToUpdateBalance = getAddressesFromTokenEvents(events);
addressesToUpdateBalance = await getAddressesFromTokenEvents(events);
}

const blockToIndex = {
Expand Down Expand Up @@ -624,7 +624,7 @@ const deleteIndexedBlocks = async job => {
await updateTotalLockedAmounts(tokenIDLockedAmountChangeMap, dbTrx);

// Get addresses to schedule account balance updates from token module events
addressesToUpdateBalance = getAddressesFromTokenEvents(events);
addressesToUpdateBalance = await getAddressesFromTokenEvents(events);
}

// Invalidate cached events for this block. Must be done after processing all event related calculations
Expand Down

0 comments on commit 32f4649

Please sign in to comment.