Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
🔨 Add durability safety checks in deleteIndexedBlocks job
Browse files Browse the repository at this point in the history
  • Loading branch information
sameersubudhi committed Oct 27, 2023
1 parent f887a5a commit 571cbe6
Showing 1 changed file with 33 additions and 0 deletions.
33 changes: 33 additions & 0 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,26 @@ const checkBlockHeightIndexStatusInDB = async (blockHeight, status) => {
);
};

// eslint-disable-next-line consistent-return
const checkBlockIDsDeleteStatusInDB = async (blockIDs, status) => {
const blocksTable = await getBlocksTable();
const resultSet = await blocksTable.find({ whereIn: { property: 'id', values: blockIDs } });
const numResults = resultSet.length;

const isCommit = DB_STATUS.COMMIT === status;
const isRollback = DB_STATUS.ROLLBACK === status;

const blockIDString = blockIDs.join(' ,');
if ((isCommit && numResults === 0) || (isRollback && numResults !== 0)) {
logger.debug(`Deleting block(s) ${blockIDString} ${isCommit ? 'committed' : 'rolled back'}.`);
return true;
}

logger.debug(
`Deleting block(s) ${blockIDString} not yet ${isCommit ? 'committed' : 'rolled back'}.`,
);
};

const indexBlock = async job => {
const { height: blockHeightFromJobData } = job.data;
let blockHeightToIndex = blockHeightFromJobData;
Expand Down Expand Up @@ -327,6 +347,7 @@ const indexBlock = async job => {

// Only schedule address balance updates if the block is indexed successfully
await scheduleAddressesBalanceUpdate(addressesToUpdateBalance);
logger.info(`Successfully indexed block ${block.id} at height ${block.height}.`);
} catch (error) {
// Reschedule the block for indexing
// eslint-disable-next-line no-use-before-define
Expand Down Expand Up @@ -527,13 +548,25 @@ const deleteIndexedBlocks = async job => {
await blocksTable.deleteByPrimaryKey(blockIDs, dbTrx);
await commitDBTransaction(dbTrx);

// Add safety check to ensure that the DB transaction is actually committed
await waitForIt(
checkBlockIDsDeleteStatusInDB.bind(null, blockIDs, DB_STATUS.COMMIT),
config.db.durabilityVerifyFrequency,
);

// Only schedule address balance updates if the block is deleted successfully
await scheduleAddressesBalanceUpdate(addressesToUpdateBalance);
logger.debug(`Committed MySQL transaction to delete block(s) with ID(s): ${blockIDs}.`);
} catch (error) {
logger.debug(`Rolled back MySQL transaction to delete block(s) with ID(s): ${blockIDs}.`);
await rollbackDBTransaction(dbTrx);

// Add safety check to ensure that the DB transaction is actually rolled back
await waitForIt(
checkBlockIDsDeleteStatusInDB.bind(null, blockIDs, DB_STATUS.ROLLBACK),
config.db.durabilityVerifyFrequency,
);

if (error.message.includes('ER_LOCK_DEADLOCK')) {
const errMessage = `Deadlock encountered while deleting block(s) with ID(s): ${blockIDs}. Will retry later.`;
logger.warn(errMessage);
Expand Down

0 comments on commit 571cbe6

Please sign in to comment.