Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Fix delete indexed block job bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
priojeetpriyom committed Nov 9, 2023
1 parent fad332a commit 1736d0f
Showing 1 changed file with 39 additions and 30 deletions.
69 changes: 39 additions & 30 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,35 @@ const indexBlock = async job => {
}
};

// Returns a list of all indexed blocks since the minimum block height from job
const getBlocksToDelete = async (blocksFromJob) => {
if (!blocksFromJob.length) {
return blocksFromJob;
}
const blocksTable = await getBlocksTable();
const minBlockHeight = blocksFromJob.reduce((minHeight, block) => Math.min(minHeight, block.height), blocksFromJob[0].height);

const blocksToRemove = await blocksTable.find(
{
propBetweens: [
{
property: 'height',
from: minBlockHeight,
},
],
sort: 'height:desc',
},
['id', 'height', 'generatorAddress'],
);

return blocksToRemove;
};

const deleteIndexedBlocks = async job => {
let addressesToUpdateBalance = [];
const { blocks } = job.data;
const blockIDs = blocks.map(b => b.id);
const { blocks: blocksFromJob } = job.data;
const blocksToDelete = await getBlocksToDelete(blocksFromJob);
const blockIDs = blocksToDelete.map(b => b.id);

const blocksTable = await getBlocksTable();
const connection = await getDBConnection(MYSQL_ENDPOINT);
Expand All @@ -407,21 +432,13 @@ const deleteIndexedBlocks = async job => {

try {
await BluebirdPromise.map(
blocks,
blocksToDelete,
async blockFromJob => {
// Check if the deleted block is indexed
const [blockFromDB] = await blocksTable.find({ height: blockFromJob.height, limit: 1 });

const [{ height: lastIndexedHeight } = {}] = await blocksTable.find(
{
sort: 'height:desc',
limit: 1,
},
['height'],
);

// Skip deletion if the block was not indexed previously. The fork doesn't have any impact on block indexing in this case.
if (!blockFromDB || blockFromJob.height > lastIndexedHeight) {
if (!blockFromDB) {
logger.info(
`Deleted block ${blockFromJob.id} at height ${blockFromJob.height} was not previously indexed. Nothing to update.`,
);
Expand Down Expand Up @@ -464,9 +481,10 @@ const deleteIndexedBlocks = async job => {
const eventTopicsTable = await getEventTopicsTable();

const { eventsInfo, eventTopicsInfo } = getEventsInfoToIndex(blockFromJob, events);
const eventIDs = eventsInfo.map(e => e.id);

await eventTopicsTable.delete(eventTopicsInfo, dbTrx);
await eventsTable.delete(eventsInfo, dbTrx);
await eventsTable.deleteByPrimaryKey(eventIDs, dbTrx);

// Update block generator's rewards
const blockRewardEvent = events.find(
Expand Down Expand Up @@ -550,7 +568,7 @@ const deleteIndexedBlocks = async job => {
{ concurrency: 1 },
);

await blocksTable.deleteByPrimaryKey(blockIDs, dbTrx);
await blocksTable.delete(blockIDs.map(blockID => ({ id: blockID })), dbTrx);
await commitDBTransaction(dbTrx);

// Add safety check to ensure that the DB transaction is actually committed
Expand Down Expand Up @@ -594,7 +612,7 @@ const deleteIndexedBlocksWrapper = async job => {
await deleteIndexedBlocks(job);
} catch (err) {
if (job.attemptsMade === job.opts.attempts - 1) {
await deleteIndexedBlocksQueue.add(job.data);
await scheduleBlockDeletion(job.data.blocks);
}
} finally {
// Resume indexing once all deletion jobs are processed
Expand Down Expand Up @@ -633,33 +651,24 @@ const getPendingDeleteJobCount = async () => {
return jobCount;
};

const scheduleBlockDeletion = async block => deleteIndexedBlocksQueue.add({ blocks: [block] });
const scheduleBlockDeletion = async block => {
block = Array.isArray(block) ? block : [block];
deleteIndexedBlocksQueue.add({ blocks: [...block] });
}

const indexNewBlock = async block => {
const blocksTable = await getBlocksTable();
logger.info(`Scheduling indexing of new block: ${block.id} at height ${block.height}.`);

const [blockFromDB] = await blocksTable.find({ height: block.height, limit: 1 }, ['id']);
const [blockFromDB] = await blocksTable.find({ height: block.height, limit: 1 }, ['id', 'height', 'generatorAddress']);

// Schedule block deletion incase unprocessed fork found
if (blockFromDB && blockFromDB.id !== block.id) {
logger.info(
`Fork detected while scheduling indexing at height: ${block.height}. Block ID from DB: ${blockFromDB.id} node: ${block.id}.`,
);

const blocksToRemove = await blocksTable.find(
{
propBetweens: [
{
property: 'height',
from: block.height,
},
],
sort: 'height:desc',
},
['id'],
);
await deleteIndexedBlocksQueue.add({ blocks: blocksToRemove });
await scheduleBlockDeletion(blockFromDB);
}

// Schedule indexing of incoming block if it is not indexed before or a fork found
Expand Down

0 comments on commit 1736d0f

Please sign in to comment.