diff --git a/services/blockchain-indexer/shared/dataService/business/transactions.js b/services/blockchain-indexer/shared/dataService/business/transactions.js index 2fa3cee817..d0edc6cf7d 100644 --- a/services/blockchain-indexer/shared/dataService/business/transactions.js +++ b/services/blockchain-indexer/shared/dataService/business/transactions.js @@ -44,7 +44,7 @@ const getTransactionIDsByBlockID = async blockID => { const transactions = await transactionsTable.find( { whereIn: { - property: 'blockId', + property: 'blockID', values: [blockID], }, }, diff --git a/services/blockchain-indexer/shared/indexer/blockchainIndex.js b/services/blockchain-indexer/shared/indexer/blockchainIndex.js index ad31d4ece8..1600a24c85 100644 --- a/services/blockchain-indexer/shared/indexer/blockchainIndex.js +++ b/services/blockchain-indexer/shared/indexer/blockchainIndex.js @@ -394,27 +394,25 @@ const indexBlock = async job => { }; // Returns a list of all indexed blocks since the minimum block height from job -const getBlocksToDelete = async blocksFromJob => { - if (!blocksFromJob.length) { - return blocksFromJob; +const getBlocksToDelete = async blocks => { + if (!blocks.length) { + return blocks; } + const blocksTable = await getBlocksTable(); - const minBlockHeight = blocksFromJob.reduce( - (minHeight, block) => Math.min(minHeight, block.height), - blocksFromJob[0].height, - ); + const minBlockHeight = Math.min(...blocks.map(b => b.height)); const blocksToRemove = await blocksTable.find( { propBetweens: [ { property: 'height', - from: minBlockHeight, + greaterThanEqualTo: minBlockHeight, }, ], sort: 'height:desc', }, - ['id', 'height', 'generatorAddress'], + ['id', 'height', 'generatorAddress', 'timestamp', 'isFinal'], ); return blocksToRemove; @@ -473,20 +471,26 @@ const deleteIndexedBlocks = async job => { }, { concurrency: 25 }, ); - forkedTransactions = transactionsToDelete.map(t => t !== null); + forkedTransactions = transactionsToDelete.filter(t => ![null, undefined].includes(t)); } await transactionsTable.deleteByPrimaryKey(forkedTransactionIDs, dbTrx); - Signals.get('deleteTransactions').dispatch({ data: forkedTransactions }); + Signals.get('deleteTransactions').dispatch({ + data: forkedTransactions, + meta: { offset: 0, count: forkedTransactions.length, total: forkedTransactions.length }, + }); // Calculate locked amount change from events and update in key_value_store table if (events.length) { const eventsTable = await getEventsTable(); const eventTopicsTable = await getEventTopicsTable(); - const { eventsInfo, eventTopicsInfo } = getEventsInfoToIndex(blockFromJob, events); + const { eventsInfo } = getEventsInfoToIndex(blockFromJob, events); const eventIDs = eventsInfo.map(e => e.id); - await eventTopicsTable.delete(eventTopicsInfo, dbTrx); + await eventTopicsTable.delete( + { whereIn: { property: 'eventID', values: eventIDs } }, + dbTrx, + ); await eventsTable.deleteByPrimaryKey(eventIDs, dbTrx); // Update block generator's rewards @@ -571,15 +575,7 @@ const deleteIndexedBlocks = async job => { { concurrency: 1 }, ); - await blocksTable.delete( - { - whereIn: { - property: 'id', - values: blockIDs, - }, - }, - dbTrx, - ); + await blocksTable.delete({ whereIn: { property: 'id', values: blockIDs } }, dbTrx); await commitDBTransaction(dbTrx); // Add safety check to ensure that the DB transaction is actually committed @@ -675,18 +671,20 @@ const indexNewBlock = async block => { 'id', 'height', 'generatorAddress', + 'timestamp', + 'isFinal', ]); - // Schedule block deletion incase unprocessed fork found + // Schedule block deletion in case of an unprocessed fork detection if (blockFromDB && blockFromDB.id !== block.id) { logger.info( - `Fork detected while scheduling indexing at height: ${block.height}. Block ID from DB: ${blockFromDB.id} node: ${block.id}.`, + `Fork detected while scheduling indexing at height: ${block.height}. Actual blockID: ${block.id}, indexed blockID: ${blockFromDB.id}.`, ); await scheduleBlockDeletion(blockFromDB); } - // Schedule indexing of incoming block if it is not indexed before or a fork found + // Schedule indexing of the incoming block if not already indexed or a fork was detected if (!blockFromDB || blockFromDB.id !== block.id) { await indexBlocksQueue.add({ height: block.height }); } @@ -699,7 +697,7 @@ const indexNewBlock = async block => { propBetweens: [ { property: 'height', - to: finalizedBlockHeight, + lowerThanEqualTo: finalizedBlockHeight, }, ], },