From 4404f401fc22f447ff66c13dd5b1c4ad1f66bcb1 Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Fri, 30 Aug 2024 17:40:50 +0300 Subject: [PATCH 01/12] added block buffer for all incoming block that are received before the full sync is complited --- .../blockannounce/BlockAnnounceEngine.java | 34 ++++++++----- .../limechain/storage/block/BlockState.java | 48 +++++++++++++++---- .../sync/fullsync/FullSyncMachine.java | 5 +- .../sync/warpsync/WarpSyncMachine.java | 7 +++ 4 files changed, 71 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index 6311cf219..31893fef1 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -21,9 +21,11 @@ import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.extern.java.Log; +import org.javatuples.Pair; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.logging.Level; @@ -74,7 +76,7 @@ private void handleHandshake(byte[] msg, PeerId peerId, Stream stream, boolean c connectionManager.addBlockAnnounceStream(stream); connectionManager.updatePeer(peerId, handshake); log.log(Level.INFO, "Received handshake from " + peerId + "\n" + - handshake); + handshake); writeHandshakeToStream(stream, peerId); } } @@ -85,19 +87,27 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) { connectionManager.updatePeer(peerId, announce); warpSyncState.syncBlockAnnounce(announce); log.log(Level.FINE, "Received block announce for block #" + announce.getHeader().getBlockNumber() + - " from " + peerId + - " with hash:0x" + announce.getHeader().getHash() + - " parentHash:" + announce.getHeader().getParentHash() + - " stateRoot:" + announce.getHeader().getStateRoot()); + " from " + peerId + + " with hash:0x" + announce.getHeader().getHash() + + " parentHash:" + announce.getHeader().getParentHash() + + " stateRoot:" + announce.getHeader().getStateRoot()); if (BlockState.getInstance().isInitialized()) { - try { - BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>()))); - } catch (BlockNodeNotFoundException ignored) { - // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. - // And thus when we receive a block announce and try to add it in the BlockState we will get this - // exception because the parent block of the received one is not found in the BlockState. - } + if (BlockState.getInstance().isFullSyncFinished()) { + try { + BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>()))); + } catch (BlockNodeNotFoundException ignored) { + // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. + // And thus when we receive a block announce and try to add it in the BlockState we will get this + // exception because the parent block of the received one is not found in the BlockState. + } + } else { + var currentBlock = new Block(announce.getHeader(), new BlockBody(new ArrayList<>())); + BlockState.getInstance().getBlockBuffer().putIfAbsent( + currentBlock.getHeader().getHash(), + new Pair<>(Instant.now(), currentBlock) + ); + } } } diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index bcacbca90..ad6bccd57 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -16,23 +16,20 @@ import com.limechain.runtime.Runtime; import com.limechain.storage.DBConstants; import com.limechain.storage.KVRepository; +import com.limechain.storage.block.tree.BlockNode; import com.limechain.storage.block.tree.BlockTree; import com.limechain.utils.scale.ScaleUtils; import io.emeraldpay.polkaj.types.Hash256; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.extern.java.Log; import org.javatuples.Pair; import org.springframework.util.SerializationUtils; import java.math.BigInteger; import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; +import java.util.*; /** * Contains the historical block data of the blockchain, including block headers and bodies. @@ -53,6 +50,12 @@ public class BlockState { private Hash256 lastFinalized; @Getter private boolean initialized; + @Getter + @Setter + private boolean fullSyncFinished; + + @Getter + private Map> blockBuffer = new LinkedHashMap<>(); /** * Initializes the BlockState instance from genesis @@ -97,11 +100,21 @@ public void initialize(final KVRepository repository) { this.genesisHash = getHashByNumberFromDb(BigInteger.ZERO); final BlockHeader lastHeader = getHighestFinalizedHeader(); - final Hash256 headerHash = lastHeader.getHash(); - this.lastFinalized = headerHash; + this.lastFinalized = lastHeader.getHash(); this.blockTree = new BlockTree(lastHeader); } + public void initializeWarp(Hash256 lastFinalizedBlockHash, BigInteger lastFinalizedBlockNumber) { + BlockNode parent = new BlockNode( + lastFinalizedBlockHash, + null, + lastFinalizedBlockNumber.longValue() + ); + + this.blockTree = new BlockTree(parent); + this.lastFinalized = lastFinalizedBlockHash; + } + /** * Check if the hash is part of the unfinalized blocks in-memory or persisted in the database. * @@ -589,7 +602,7 @@ public List retrieveRangeFromDatabase(final Hash256 startHash, final Bl // Verify that we ended up with the start hash if (!Objects.equals(inLoopHash, startHash)) { throw new BlockStorageGenericException("Start hash mismatch: expected " + startHash + - ", found: " + inLoopHash); + ", found: " + inLoopHash); } return hashes; @@ -884,7 +897,7 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { Block block = unfinalizedBlocks.get(subchainHash); if (block == null) { throw new BlockNotFoundException("Failed to find block in unfinalized block map for hash" + - subchainHash); + subchainHash); } setHeader(block.getHeader()); @@ -904,4 +917,19 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { //TODO: If currentFinalizedHash is not equal to subchain hash, delete subchain state trie } } + + // TODO: The last block from full sync machine has bigger number than the last block from the buffer, which results + // in not finding any parent of the blocks from the buffer in the block tree and it seems pointless to add any of the + // blocks from the buffer to the block tree. + public void mergeBlockStateWithAnnouncedBlocks() { + for (Map.Entry> entry : blockBuffer.entrySet()) { + + try { + this.addBlockWithArrivalTime(entry.getValue().getValue1(), entry.getValue().getValue0()); + } catch (BlockNodeNotFoundException ex) { + log.info(ex.getMessage()); + } + } + blockBuffer.clear(); + } } diff --git a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java index 1b44907ce..8bebf42d4 100644 --- a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java +++ b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java @@ -102,6 +102,9 @@ public void start() { startNumber += blocksToFetch; receivedBlocks = requestBlocks(startNumber, blocksToFetch); } + + BlockState.getInstance().mergeBlockStateWithAnnouncedBlocks(); + blockState.setFullSyncFinished(true); } private TrieStructure loadStateAtBlockFromPeer(Hash256 lastFinalizedBlockHash) { @@ -207,7 +210,7 @@ private static Block protobufDecodeBlock(SyncMessage.BlockData blockData) { */ private void executeBlocks(List receivedBlockDatas, TrieAccessor trieAccessor) { for (Block block : receivedBlockDatas) { - log.fine("Block number to be executed is " + block.getHeader().getBlockNumber()); + log.info("Block number to be executed is " + block.getHeader().getBlockNumber()); try { blockState.addBlock(block); diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java index a3eaecdbf..d26a19554 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java @@ -5,6 +5,7 @@ import com.limechain.chain.lightsyncstate.LightSyncState; import com.limechain.network.Network; import com.limechain.network.protocol.warp.dto.WarpSyncFragment; +import com.limechain.storage.block.BlockState; import com.limechain.storage.block.SyncState; import com.limechain.sync.warpsync.action.FinishedAction; import com.limechain.sync.warpsync.action.RequestFragmentsAction; @@ -101,6 +102,12 @@ private void finishWarpSync() { this.warpState.setWarpSyncFinished(true); this.networkService.handshakeBootNodes(); this.syncState.persistState(); + + BlockState.getInstance().initializeWarp( + syncState.getLastFinalizedBlockHash(), + syncState.getLastFinalizedBlockNumber() + ); + log.info("Warp sync finished."); this.onFinishCallbacks.forEach(executor::submit); } From 3e250ef7fa84fd17a4497cec527d8a20584720ce Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Mon, 2 Sep 2024 16:58:03 +0300 Subject: [PATCH 02/12] Implement initial queuing mechanism for blocks received from blockAnnounce --- .../blockannounce/BlockAnnounceEngine.java | 30 +++++++++-------- .../limechain/storage/block/BlockState.java | 32 +++++++++++++------ .../storage/block/tree/BlockTree.java | 7 +++- .../sync/fullsync/FullSyncMachine.java | 1 - 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index 31893fef1..e81bc4e58 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -93,21 +93,23 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) { " stateRoot:" + announce.getHeader().getStateRoot()); if (BlockState.getInstance().isInitialized()) { - if (BlockState.getInstance().isFullSyncFinished()) { - try { - BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>()))); - } catch (BlockNodeNotFoundException ignored) { - // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. - // And thus when we receive a block announce and try to add it in the BlockState we will get this - // exception because the parent block of the received one is not found in the BlockState. - } - } else { - var currentBlock = new Block(announce.getHeader(), new BlockBody(new ArrayList<>())); - BlockState.getInstance().getBlockBuffer().putIfAbsent( - currentBlock.getHeader().getHash(), - new Pair<>(Instant.now(), currentBlock) - ); + + if (BlockState.getInstance().isFullSyncFinished()) { + BlockState.getInstance().processPendingBlocksFromQueue(); + } + + if (BlockState.getInstance().isFullSyncFinished() && BlockState.getInstance().getPendingBlocksQueue().isEmpty()) { + try { + BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>()))); + } catch (BlockNodeNotFoundException ignored) { + //TODO: Handle the error + // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. + // And thus when we receive a block announce and try to add it in the BlockState we will get this + // exception because the parent block of the received one is not found in the BlockState. } + } else { + BlockState.getInstance().addBlockToQueue(announce.getHeader()); + } } } diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index ad6bccd57..99147f30f 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -55,7 +55,7 @@ public class BlockState { private boolean fullSyncFinished; @Getter - private Map> blockBuffer = new LinkedHashMap<>(); + private final ArrayDeque> pendingBlocksQueue = new ArrayDeque<>(); /** * Initializes the BlockState instance from genesis @@ -918,18 +918,32 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { } } - // TODO: The last block from full sync machine has bigger number than the last block from the buffer, which results - // in not finding any parent of the blocks from the buffer in the block tree and it seems pointless to add any of the - // blocks from the buffer to the block tree. - public void mergeBlockStateWithAnnouncedBlocks() { - for (Map.Entry> entry : blockBuffer.entrySet()) { + public synchronized void addBlockToQueue(BlockHeader blockHeader) { + var currentBlock = new Block( + blockHeader, + new BlockBody(new ArrayList<>()) + ); + + pendingBlocksQueue.add( + new Pair<>(Instant.now(), currentBlock) + ); + } + + public synchronized void processPendingBlocksFromQueue() { + var rootBlockNumber = BigInteger.valueOf(blockTree.getRoot().getNumber()); + + while (!pendingBlocksQueue.isEmpty()) { + var current = pendingBlocksQueue.poll(); + + if (current.getValue1().getHeader().getBlockNumber().compareTo(rootBlockNumber) <= 0) { + continue; + } try { - this.addBlockWithArrivalTime(entry.getValue().getValue1(), entry.getValue().getValue0()); - } catch (BlockNodeNotFoundException ex) { + this.addBlockWithArrivalTime(current.getValue1(), current.getValue0()); + } catch (BlockStorageGenericException ex) { log.info(ex.getMessage()); } } - blockBuffer.clear(); } } diff --git a/src/main/java/com/limechain/storage/block/tree/BlockTree.java b/src/main/java/com/limechain/storage/block/tree/BlockTree.java index e148e7224..4deffea32 100644 --- a/src/main/java/com/limechain/storage/block/tree/BlockTree.java +++ b/src/main/java/com/limechain/storage/block/tree/BlockTree.java @@ -6,6 +6,7 @@ import com.limechain.exception.storage.LowerThanRootException; import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.runtime.Runtime; +import com.limechain.storage.block.BlockState; import com.limechain.storage.block.map.HashToRuntime; import io.emeraldpay.polkaj.types.Hash256; import lombok.Getter; @@ -84,7 +85,8 @@ public void addBlock(final BlockHeader header, final Instant arrivalTime) { throw new BlockNodeNotFoundException("Parent does not exist in tree"); } if (getNode(header.getHash()) != null) { - throw new BlockAlreadyExistsException("Block already exists in tree"); + //TODO: Remove + throw new BlockAlreadyExistsException("Block already exists in tree -> ["+header.getBlockNumber()+"] ["+header.getHash()+"]"); } long number = parent.getNumber() + 1; @@ -97,6 +99,9 @@ public void addBlock(final BlockHeader header, final Instant arrivalTime) { //TODO: Check if primary } + //TODO: Remove + System.out.println("Queue size: " + BlockState.getInstance().getPendingBlocksQueue().size()); + System.out.println("Block [" + header.getBlockNumber() + "] with hash [" + header.getHash() + "] added to the BlockTree"); BlockNode newBlockNode = new BlockNode(header.getHash(), parent, new ArrayList<>(), number, arrivalTime, isPrimary); parent.addChild(newBlockNode); diff --git a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java index 8bebf42d4..ef2c41a9c 100644 --- a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java +++ b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java @@ -103,7 +103,6 @@ public void start() { receivedBlocks = requestBlocks(startNumber, blocksToFetch); } - BlockState.getInstance().mergeBlockStateWithAnnouncedBlocks(); blockState.setFullSyncFinished(true); } From 723809b4c367b6757faf73c0b10b7b2535ea20b8 Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Mon, 2 Sep 2024 17:51:11 +0300 Subject: [PATCH 03/12] Refactored handleBlockAnnounce method --- .../blockannounce/BlockAnnounceEngine.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index e81bc4e58..840430bcd 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -21,18 +21,18 @@ import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.extern.java.Log; -import org.javatuples.Pair; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.time.Instant; import java.util.ArrayList; import java.util.logging.Level; @Log @AllArgsConstructor(access = AccessLevel.PROTECTED) public class BlockAnnounceEngine { + public static final int HANDSHAKE_LENGTH = 69; + protected ConnectionManager connectionManager; protected WarpSyncState warpSyncState; protected BlockAnnounceHandshakeBuilder handshakeBuilder; @@ -93,23 +93,24 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) { " stateRoot:" + announce.getHeader().getStateRoot()); if (BlockState.getInstance().isInitialized()) { - if (BlockState.getInstance().isFullSyncFinished()) { + BlockState.getInstance().processPendingBlocksFromQueue(); - } - if (BlockState.getInstance().isFullSyncFinished() && BlockState.getInstance().getPendingBlocksQueue().isEmpty()) { - try { - BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>()))); - } catch (BlockNodeNotFoundException ignored) { - //TODO: Handle the error - // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. - // And thus when we receive a block announce and try to add it in the BlockState we will get this - // exception because the parent block of the received one is not found in the BlockState. + if (BlockState.getInstance().getPendingBlocksQueue().isEmpty()) { + try { + BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>()))); + return; + } catch (BlockNodeNotFoundException ignored) { + //TODO: Handle the error + // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. + // And thus when we receive a block announce and try to add it in the BlockState we will get this + // exception because the parent block of the received one is not found in the BlockState. + } } - } else { - BlockState.getInstance().addBlockToQueue(announce.getHeader()); } + + BlockState.getInstance().addBlockToQueue(announce.getHeader()); } } From 5c91a04249cc9007cd21c38ee222a94217857bba Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Tue, 3 Sep 2024 09:30:38 +0300 Subject: [PATCH 04/12] Moved block addition logic to BlockState for better encapsulation --- .../blockannounce/BlockAnnounceEngine.java | 21 +------------- .../limechain/storage/block/BlockState.java | 29 +++++++++++++++++-- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index 840430bcd..b7ee7d9a1 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -92,26 +92,7 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) { " parentHash:" + announce.getHeader().getParentHash() + " stateRoot:" + announce.getHeader().getStateRoot()); - if (BlockState.getInstance().isInitialized()) { - if (BlockState.getInstance().isFullSyncFinished()) { - - BlockState.getInstance().processPendingBlocksFromQueue(); - - if (BlockState.getInstance().getPendingBlocksQueue().isEmpty()) { - try { - BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>()))); - return; - } catch (BlockNodeNotFoundException ignored) { - //TODO: Handle the error - // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. - // And thus when we receive a block announce and try to add it in the BlockState we will get this - // exception because the parent block of the received one is not found in the BlockState. - } - } - } - - BlockState.getInstance().addBlockToQueue(announce.getHeader()); - } + BlockState.getInstance().addBlockToBlockTree(announce.getHeader()); } public void writeHandshakeToStream(Stream stream, PeerId peerId) { diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index 99147f30f..1b2f4fc84 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -918,7 +918,30 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { } } - public synchronized void addBlockToQueue(BlockHeader blockHeader) { + public void addBlockToBlockTree(BlockHeader blockHeader) { + if (isInitialized()) { + if (isFullSyncFinished()) { + + processPendingBlocksFromQueue(); + + if (getPendingBlocksQueue().isEmpty()) { + try { + addBlock(new Block(blockHeader, new BlockBody(new ArrayList<>()))); + return; + } catch (BlockNodeNotFoundException ignored) { + //TODO: Handle the error + // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. + // And thus when we receive a block announce and try to add it in the BlockState we will get this + // exception because the parent block of the received one is not found in the BlockState. + } + } + } + + addBlockToQueue(blockHeader); + } + } + + private synchronized void addBlockToQueue(BlockHeader blockHeader) { var currentBlock = new Block( blockHeader, new BlockBody(new ArrayList<>()) @@ -929,7 +952,7 @@ public synchronized void addBlockToQueue(BlockHeader blockHeader) { ); } - public synchronized void processPendingBlocksFromQueue() { + private synchronized void processPendingBlocksFromQueue() { var rootBlockNumber = BigInteger.valueOf(blockTree.getRoot().getNumber()); while (!pendingBlocksQueue.isEmpty()) { @@ -940,7 +963,7 @@ public synchronized void processPendingBlocksFromQueue() { } try { - this.addBlockWithArrivalTime(current.getValue1(), current.getValue0()); + addBlockWithArrivalTime(current.getValue1(), current.getValue0()); } catch (BlockStorageGenericException ex) { log.info(ex.getMessage()); } From 09ba7c9d36979d9cb8365cc8b8afa252ad35b9a9 Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Tue, 3 Sep 2024 17:15:34 +0300 Subject: [PATCH 05/12] Refactored method and variable names --- .../java/com/limechain/client/FullNode.java | 2 +- .../limechain/storage/block/BlockState.java | 32 +++++++++++-------- .../storage/block/tree/BlockTree.java | 7 +--- .../sync/warpsync/WarpSyncMachine.java | 2 +- 4 files changed, 21 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/limechain/client/FullNode.java b/src/main/java/com/limechain/client/FullNode.java index 117f201f0..be98e517e 100644 --- a/src/main/java/com/limechain/client/FullNode.java +++ b/src/main/java/com/limechain/client/FullNode.java @@ -59,7 +59,7 @@ public void start() { switch (args.syncMode()) { case FULL -> fullSyncMachine.start(); case WARP -> { - warpSyncMachine.onFinish(() -> fullSyncMachine.start()); + warpSyncMachine.onFinish(fullSyncMachine::start); warpSyncMachine.start(); } default -> throw new IllegalStateException("Unexpected value: " + args.syncMode()); diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index 1b2f4fc84..729e6696a 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -53,7 +53,6 @@ public class BlockState { @Getter @Setter private boolean fullSyncFinished; - @Getter private final ArrayDeque> pendingBlocksQueue = new ArrayDeque<>(); @@ -104,14 +103,14 @@ public void initialize(final KVRepository repository) { this.blockTree = new BlockTree(lastHeader); } - public void initializeWarp(Hash256 lastFinalizedBlockHash, BigInteger lastFinalizedBlockNumber) { - BlockNode parent = new BlockNode( + public void initializeAfterWarpSync(Hash256 lastFinalizedBlockHash, BigInteger lastFinalizedBlockNumber) { + BlockNode parentBlock = new BlockNode( lastFinalizedBlockHash, null, lastFinalizedBlockNumber.longValue() ); - this.blockTree = new BlockTree(parent); + this.blockTree = new BlockTree(parentBlock); this.lastFinalized = lastFinalizedBlockHash; } @@ -927,13 +926,14 @@ public void addBlockToBlockTree(BlockHeader blockHeader) { if (getPendingBlocksQueue().isEmpty()) { try { addBlock(new Block(blockHeader, new BlockBody(new ArrayList<>()))); - return; - } catch (BlockNodeNotFoundException ignored) { - //TODO: Handle the error - // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. - // And thus when we receive a block announce and try to add it in the BlockState we will get this - // exception because the parent block of the received one is not found in the BlockState. + } catch (BlockStorageGenericException ex) { + log.warning(String.format("Block with hash %s was not added to the block tree. Reason: %s", + blockHeader.getHash().toString(), + ex.getMessage()) + ); } + + return; } } @@ -956,16 +956,20 @@ private synchronized void processPendingBlocksFromQueue() { var rootBlockNumber = BigInteger.valueOf(blockTree.getRoot().getNumber()); while (!pendingBlocksQueue.isEmpty()) { - var current = pendingBlocksQueue.poll(); + var currentPair = pendingBlocksQueue.poll(); + var block = currentPair.getValue1(); + var arrivalTime = currentPair.getValue0(); - if (current.getValue1().getHeader().getBlockNumber().compareTo(rootBlockNumber) <= 0) { + if (block.getHeader().getBlockNumber().compareTo(rootBlockNumber) <= 0) { continue; } try { - addBlockWithArrivalTime(current.getValue1(), current.getValue0()); + addBlockWithArrivalTime(block, arrivalTime); } catch (BlockStorageGenericException ex) { - log.info(ex.getMessage()); + log.warning(String.format("Block with hash %s was not added to the block tree. Reason: %s", + block.getHeader().getHash().toString(), + ex.getMessage())); } } } diff --git a/src/main/java/com/limechain/storage/block/tree/BlockTree.java b/src/main/java/com/limechain/storage/block/tree/BlockTree.java index 4deffea32..e148e7224 100644 --- a/src/main/java/com/limechain/storage/block/tree/BlockTree.java +++ b/src/main/java/com/limechain/storage/block/tree/BlockTree.java @@ -6,7 +6,6 @@ import com.limechain.exception.storage.LowerThanRootException; import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.runtime.Runtime; -import com.limechain.storage.block.BlockState; import com.limechain.storage.block.map.HashToRuntime; import io.emeraldpay.polkaj.types.Hash256; import lombok.Getter; @@ -85,8 +84,7 @@ public void addBlock(final BlockHeader header, final Instant arrivalTime) { throw new BlockNodeNotFoundException("Parent does not exist in tree"); } if (getNode(header.getHash()) != null) { - //TODO: Remove - throw new BlockAlreadyExistsException("Block already exists in tree -> ["+header.getBlockNumber()+"] ["+header.getHash()+"]"); + throw new BlockAlreadyExistsException("Block already exists in tree"); } long number = parent.getNumber() + 1; @@ -99,9 +97,6 @@ public void addBlock(final BlockHeader header, final Instant arrivalTime) { //TODO: Check if primary } - //TODO: Remove - System.out.println("Queue size: " + BlockState.getInstance().getPendingBlocksQueue().size()); - System.out.println("Block [" + header.getBlockNumber() + "] with hash [" + header.getHash() + "] added to the BlockTree"); BlockNode newBlockNode = new BlockNode(header.getHash(), parent, new ArrayList<>(), number, arrivalTime, isPrimary); parent.addChild(newBlockNode); diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java index d26a19554..a241b1aa5 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java @@ -103,7 +103,7 @@ private void finishWarpSync() { this.networkService.handshakeBootNodes(); this.syncState.persistState(); - BlockState.getInstance().initializeWarp( + BlockState.getInstance().initializeAfterWarpSync( syncState.getLastFinalizedBlockHash(), syncState.getLastFinalizedBlockNumber() ); From a9b0e44864e2b89d4735fd651fbe4ad031287d39 Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Wed, 4 Sep 2024 17:14:14 +0300 Subject: [PATCH 06/12] Extended the block finalization logic --- .../java/com/limechain/network/Network.java | 8 ++++---- .../com/limechain/network/ProtocolUtils.java | 4 ++++ .../limechain/storage/block/BlockState.java | 19 ++++++++----------- .../limechain/storage/block/SyncState.java | 3 +++ .../storage/block/tree/BlockTree.java | 8 ++------ 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/limechain/network/Network.java b/src/main/java/com/limechain/network/Network.java index 3a347134c..fa05159d6 100644 --- a/src/main/java/com/limechain/network/Network.java +++ b/src/main/java/com/limechain/network/Network.java @@ -120,16 +120,16 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen String pingProtocol = ProtocolUtils.PING_PROTOCOL; String chainId = chainService.getChainSpec().getProtocolId(); - String protocolId = cliArgs.noLegacyProtocols() - ? StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString()) - : chainId; + boolean legacyProtocol = !cliArgs.noLegacyProtocols(); + String protocolId = legacyProtocol ? chainId : + StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString()); String kadProtocolId = ProtocolUtils.getKadProtocol(chainId); String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(protocolId); String lightProtocolId = ProtocolUtils.getLightMessageProtocol(protocolId); String syncProtocolId = ProtocolUtils.getSyncProtocol(protocolId); String stateProtocolId = ProtocolUtils.getStateProtocol(protocolId); String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(protocolId); - String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId); + String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId, legacyProtocol); String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId); kademliaService = new KademliaService(kadProtocolId, hostId, isLocalEnabled, clientMode); diff --git a/src/main/java/com/limechain/network/ProtocolUtils.java b/src/main/java/com/limechain/network/ProtocolUtils.java index 26b58cd33..1d51d5004 100644 --- a/src/main/java/com/limechain/network/ProtocolUtils.java +++ b/src/main/java/com/limechain/network/ProtocolUtils.java @@ -31,6 +31,10 @@ public static String getKadProtocol(String chainId) { return String.format("/%s/kad", chainId); } + public static String getGrandpaProtocol(String chainId, boolean legacyProtocol) { + return String.format("/%s/grandpa/1", legacyProtocol ? "paritytech" : chainId); + } + public static String getGrandpaProtocol(String chainId) { return String.format("/%s/grandpa/1", grandpaProtocolChain(chainId)); } diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index 729e6696a..fb05c061a 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -761,6 +761,8 @@ public Block getUnfinalizedBlockFromHash(final Hash256 hash) { * @throws BlockNodeNotFoundException if the block corresponding to the provided hash is not found. */ public void setFinalizedHash(final Hash256 hash, final BigInteger round, final BigInteger setId) { + if (!fullSyncFinished) return; + if (!hasHeader(hash)) { throw new BlockNodeNotFoundException("Cannot finalise unknown block " + hash); } @@ -895,7 +897,7 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { Block block = unfinalizedBlocks.get(subchainHash); if (block == null) { - throw new BlockNotFoundException("Failed to find block in unfinalized block map for hash" + + throw new BlockNotFoundException("Failed to find block in unfinalized block map for hash " + subchainHash); } @@ -917,7 +919,7 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { } } - public void addBlockToBlockTree(BlockHeader blockHeader) { + public synchronized void addBlockToBlockTree(BlockHeader blockHeader) { if (isInitialized()) { if (isFullSyncFinished()) { @@ -927,10 +929,7 @@ public void addBlockToBlockTree(BlockHeader blockHeader) { try { addBlock(new Block(blockHeader, new BlockBody(new ArrayList<>()))); } catch (BlockStorageGenericException ex) { - log.warning(String.format("Block with hash %s was not added to the block tree. Reason: %s", - blockHeader.getHash().toString(), - ex.getMessage()) - ); + log.fine(String.format("[%s] %s", blockHeader.getHash().toString(), ex.getMessage())); } return; @@ -941,7 +940,7 @@ public void addBlockToBlockTree(BlockHeader blockHeader) { } } - private synchronized void addBlockToQueue(BlockHeader blockHeader) { + private void addBlockToQueue(BlockHeader blockHeader) { var currentBlock = new Block( blockHeader, new BlockBody(new ArrayList<>()) @@ -952,7 +951,7 @@ private synchronized void addBlockToQueue(BlockHeader blockHeader) { ); } - private synchronized void processPendingBlocksFromQueue() { + private void processPendingBlocksFromQueue() { var rootBlockNumber = BigInteger.valueOf(blockTree.getRoot().getNumber()); while (!pendingBlocksQueue.isEmpty()) { @@ -967,9 +966,7 @@ private synchronized void processPendingBlocksFromQueue() { try { addBlockWithArrivalTime(block, arrivalTime); } catch (BlockStorageGenericException ex) { - log.warning(String.format("Block with hash %s was not added to the block tree. Reason: %s", - block.getHeader().getHash().toString(), - ex.getMessage())); + log.fine(String.format("[%s] %s", block.getHeader().getHash().toString(), ex.getMessage())); } } } diff --git a/src/main/java/com/limechain/storage/block/SyncState.java b/src/main/java/com/limechain/storage/block/SyncState.java index 320d17720..87922868c 100644 --- a/src/main/java/com/limechain/storage/block/SyncState.java +++ b/src/main/java/com/limechain/storage/block/SyncState.java @@ -76,7 +76,10 @@ public void finalizedCommitMessage(CommitMessage commitMessage) { this.stateRoot = blockByHash.getHeader().getStateRoot(); this.lastFinalizedBlockHash = commitMessage.getVote().getBlockHash(); this.lastFinalizedBlockNumber = commitMessage.getVote().getBlockNumber(); + + BlockState.getInstance().setFinalizedHash(commitMessage.getVote().getBlockHash(), commitMessage.getRoundNumber(), commitMessage.getSetId()); } + } catch (HeaderNotFoundException ignored) { log.fine("Received commit message for a block that is not in the block store"); } diff --git a/src/main/java/com/limechain/storage/block/tree/BlockTree.java b/src/main/java/com/limechain/storage/block/tree/BlockTree.java index e148e7224..2bac7cc04 100644 --- a/src/main/java/com/limechain/storage/block/tree/BlockTree.java +++ b/src/main/java/com/limechain/storage/block/tree/BlockTree.java @@ -178,10 +178,6 @@ public List rangeInMemory(final Hash256 startHash, final Hash256 endHas throw new BlockNodeNotFoundException("Start node not found"); } - if (startBlockNode.getNumber() > endBlockNode.getNumber()) { - throw new BlockStorageGenericException("Start is greater than end"); - } - return accumulateHashesInDescendingOrder(endBlockNode, startBlockNode); } @@ -196,7 +192,7 @@ public List rangeInMemory(final Hash256 startHash, final Hash256 endHas */ public List accumulateHashesInDescendingOrder(final BlockNode endNode, final BlockNode startNode) { if (startNode.getNumber() > endNode.getNumber()) { - throw new IllegalArgumentException("Start is greater than end"); + throw new BlockStorageGenericException("Start is greater than end"); } int blocksInRange = (int) (endNode.getNumber() - startNode.getNumber()); @@ -205,7 +201,7 @@ public List accumulateHashesInDescendingOrder(final BlockNode endNode, BlockNode tempNode = endNode; for (int position = blocksInRange - 1; position >= 0; position--) { hashes.add(tempNode.getHash()); - tempNode = endNode.getParent(); + tempNode = tempNode.getParent(); if (tempNode == null) { throw new BlockStorageGenericException("End node is null"); From eda6ca543bf41ba7c2bbe4ed163153ef680d32ad Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Thu, 5 Sep 2024 11:41:57 +0300 Subject: [PATCH 07/12] Changed log level --- src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java index ef2c41a9c..04605a593 100644 --- a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java +++ b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java @@ -209,7 +209,7 @@ private static Block protobufDecodeBlock(SyncMessage.BlockData blockData) { */ private void executeBlocks(List receivedBlockDatas, TrieAccessor trieAccessor) { for (Block block : receivedBlockDatas) { - log.info("Block number to be executed is " + block.getHeader().getBlockNumber()); + log.fine("Block number to be executed is " + block.getHeader().getBlockNumber()); try { blockState.addBlock(block); From 02fbcd555671d9a7c05a7dc772f52e0804a1284d Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Mon, 9 Sep 2024 16:16:37 +0300 Subject: [PATCH 08/12] Refactor after code review --- .../com/limechain/network/ProtocolUtils.java | 4 --- .../limechain/storage/block/BlockState.java | 35 +++++++++++-------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/limechain/network/ProtocolUtils.java b/src/main/java/com/limechain/network/ProtocolUtils.java index 1d51d5004..9a272e983 100644 --- a/src/main/java/com/limechain/network/ProtocolUtils.java +++ b/src/main/java/com/limechain/network/ProtocolUtils.java @@ -35,10 +35,6 @@ public static String getGrandpaProtocol(String chainId, boolean legacyProtocol) return String.format("/%s/grandpa/1", legacyProtocol ? "paritytech" : chainId); } - public static String getGrandpaProtocol(String chainId) { - return String.format("/%s/grandpa/1", grandpaProtocolChain(chainId)); - } - //TODO: figure out a more elegant solution private static String grandpaProtocolChain(String chainId) { return chainId.equals("dot") ? "paritytech" : chainId; diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index fb05c061a..ef535ca89 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -29,7 +29,13 @@ import java.math.BigInteger; import java.time.Instant; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; /** * Contains the historical block data of the blockchain, including block headers and bodies. @@ -50,7 +56,6 @@ public class BlockState { private Hash256 lastFinalized; @Getter private boolean initialized; - @Getter @Setter private boolean fullSyncFinished; @Getter @@ -920,23 +925,23 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { } public synchronized void addBlockToBlockTree(BlockHeader blockHeader) { - if (isInitialized()) { - if (isFullSyncFinished()) { + if (!initialized) { + throw new IllegalStateException("BlockState not initialized"); + } - processPendingBlocksFromQueue(); + if (!fullSyncFinished) { + addBlockToQueue(blockHeader); + return; + } - if (getPendingBlocksQueue().isEmpty()) { - try { - addBlock(new Block(blockHeader, new BlockBody(new ArrayList<>()))); - } catch (BlockStorageGenericException ex) { - log.fine(String.format("[%s] %s", blockHeader.getHash().toString(), ex.getMessage())); - } + processPendingBlocksFromQueue(); - return; - } + if (getPendingBlocksQueue().isEmpty()) { + try { + addBlock(new Block(blockHeader, new BlockBody(new ArrayList<>()))); + } catch (BlockStorageGenericException ex) { + log.fine(String.format("[%s] %s", blockHeader.getHash().toString(), ex.getMessage())); } - - addBlockToQueue(blockHeader); } } From 9eda00123ebd08c67896b0c48c0a344d4935f8d9 Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Mon, 9 Sep 2024 16:34:40 +0300 Subject: [PATCH 09/12] Added check if blockState is initialized before finalization of a block --- src/main/java/com/limechain/network/ProtocolUtils.java | 5 ----- src/main/java/com/limechain/storage/block/BlockState.java | 4 ++++ 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/limechain/network/ProtocolUtils.java b/src/main/java/com/limechain/network/ProtocolUtils.java index 9a272e983..acd865963 100644 --- a/src/main/java/com/limechain/network/ProtocolUtils.java +++ b/src/main/java/com/limechain/network/ProtocolUtils.java @@ -35,11 +35,6 @@ public static String getGrandpaProtocol(String chainId, boolean legacyProtocol) return String.format("/%s/grandpa/1", legacyProtocol ? "paritytech" : chainId); } - //TODO: figure out a more elegant solution - private static String grandpaProtocolChain(String chainId) { - return chainId.equals("dot") ? "paritytech" : chainId; - } - public static String getTransactionsProtocol(String chainId) { return String.format("/%s/transactions/1", chainId); } diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index ef535ca89..31e00cb8b 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -768,6 +768,10 @@ public Block getUnfinalizedBlockFromHash(final Hash256 hash) { public void setFinalizedHash(final Hash256 hash, final BigInteger round, final BigInteger setId) { if (!fullSyncFinished) return; + if (!initialized) { + throw new IllegalStateException("BlockState not initialized"); + } + if (!hasHeader(hash)) { throw new BlockNodeNotFoundException("Cannot finalise unknown block " + hash); } From 83a736f69aea7720ef8a2d94efc7dd2db70e3080 Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Mon, 9 Sep 2024 17:01:40 +0300 Subject: [PATCH 10/12] Fixed failing test --- .../blockannounce/BlockAnnounceEngine.java | 4 --- .../BlockAnnounceEngineTest.java | 36 +++++++++++++++---- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index b7ee7d9a1..f72c80cd6 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -1,7 +1,6 @@ package com.limechain.network.protocol.blockannounce; import com.limechain.exception.scale.ScaleEncodingException; -import com.limechain.exception.storage.BlockNodeNotFoundException; import com.limechain.network.ConnectionManager; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; @@ -9,8 +8,6 @@ import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleReader; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleWriter; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleReader; -import com.limechain.network.protocol.warp.dto.Block; -import com.limechain.network.protocol.warp.dto.BlockBody; import com.limechain.rpc.server.AppBean; import com.limechain.storage.block.BlockState; import com.limechain.sync.warpsync.WarpSyncState; @@ -24,7 +21,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.logging.Level; @Log diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java index b1c8d5e59..68a8c05c6 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java @@ -1,5 +1,6 @@ package com.limechain.network.protocol.blockannounce; +import com.limechain.exception.global.RuntimeCodeException; import com.limechain.network.ConnectionManager; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; @@ -7,6 +8,7 @@ import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleWriter; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleReader; import com.limechain.network.protocol.warp.dto.BlockHeader; +import com.limechain.storage.block.BlockState; import com.limechain.sync.warpsync.WarpSyncState; import io.emeraldpay.polkaj.scale.ScaleCodecReader; import io.emeraldpay.polkaj.scale.ScaleCodecWriter; @@ -20,16 +22,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; +import java.lang.reflect.Field; import java.util.Arrays; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockConstruction; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @SuppressWarnings("unused") @ExtendWith(MockitoExtension.class) @@ -134,13 +133,19 @@ void receiveBlockAnnounceWhenConnectedShouldUpdatePeer() { } @Test - void receiveBlockAnnounceWhenConnectedShouldSyncMessage() { + void receiveBlockAnnounceWhenConnectedShouldSyncMessage() throws IllegalAccessException, NoSuchFieldException { byte[] message = new byte[] { 1, 2, 3 }; BlockAnnounceMessage blockAnnounceMessage = mock(BlockAnnounceMessage.class); when(blockAnnounceMessage.getHeader()).thenReturn(mock(BlockHeader.class)); when(stream.remotePeerId()).thenReturn(peerId); when(connectionManager.isBlockAnnounceConnected(peerId)).thenReturn(true); + BlockState blockState = BlockState.getInstance(); + + Field initializedField = BlockState.class.getDeclaredField("initialized"); + initializedField.setAccessible(true); + initializedField.set(blockState, true); + try (MockedConstruction readerMock = mockConstruction(ScaleCodecReader.class, (mock, context) -> when(mock.read(any(BlockAnnounceMessageScaleReader.class))) .thenReturn(blockAnnounceMessage)) @@ -150,4 +155,21 @@ void receiveBlockAnnounceWhenConnectedShouldSyncMessage() { verify(warpSyncState).syncBlockAnnounce(blockAnnounceMessage); } } + + @Test + void receiveBlockAnnounceWithoutInitializedBlockStateShouldThrowException() throws IllegalAccessException, NoSuchFieldException { + byte[] message = new byte[] { 1, 2, 3 }; + BlockAnnounceMessage blockAnnounceMessage = mock(BlockAnnounceMessage.class); + when(blockAnnounceMessage.getHeader()).thenReturn(mock(BlockHeader.class)); + when(stream.remotePeerId()).thenReturn(peerId); + when(connectionManager.isBlockAnnounceConnected(peerId)).thenReturn(true); + + try (MockedConstruction readerMock = mockConstruction(ScaleCodecReader.class, + (mock, context) -> when(mock.read(any(BlockAnnounceMessageScaleReader.class))) + .thenReturn(blockAnnounceMessage)) + ) { + assertThrows(IllegalStateException.class, () -> blockAnnounceEngine.receiveRequest(message, stream)); + verify(warpSyncState).syncBlockAnnounce(blockAnnounceMessage); + } + } } \ No newline at end of file From 99f85a091f4a24d2578bfe86e5da1e79fa89ed8d Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Mon, 9 Sep 2024 17:14:18 +0300 Subject: [PATCH 11/12] Refactored initialized checks --- .../blockannounce/BlockAnnounceEngine.java | 4 ++- .../limechain/storage/block/BlockState.java | 8 ------ .../limechain/storage/block/SyncState.java | 4 ++- .../BlockAnnounceEngineTest.java | 27 ------------------- 4 files changed, 6 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index f72c80cd6..ee4a2e26d 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -88,7 +88,9 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) { " parentHash:" + announce.getHeader().getParentHash() + " stateRoot:" + announce.getHeader().getStateRoot()); - BlockState.getInstance().addBlockToBlockTree(announce.getHeader()); + if (BlockState.getInstance().isInitialized()) { + BlockState.getInstance().addBlockToBlockTree(announce.getHeader()); + } } public void writeHandshakeToStream(Stream stream, PeerId peerId) { diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index 31e00cb8b..c83ef5609 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -768,10 +768,6 @@ public Block getUnfinalizedBlockFromHash(final Hash256 hash) { public void setFinalizedHash(final Hash256 hash, final BigInteger round, final BigInteger setId) { if (!fullSyncFinished) return; - if (!initialized) { - throw new IllegalStateException("BlockState not initialized"); - } - if (!hasHeader(hash)) { throw new BlockNodeNotFoundException("Cannot finalise unknown block " + hash); } @@ -929,10 +925,6 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { } public synchronized void addBlockToBlockTree(BlockHeader blockHeader) { - if (!initialized) { - throw new IllegalStateException("BlockState not initialized"); - } - if (!fullSyncFinished) { addBlockToQueue(blockHeader); return; diff --git a/src/main/java/com/limechain/storage/block/SyncState.java b/src/main/java/com/limechain/storage/block/SyncState.java index 87922868c..aa2123421 100644 --- a/src/main/java/com/limechain/storage/block/SyncState.java +++ b/src/main/java/com/limechain/storage/block/SyncState.java @@ -77,7 +77,9 @@ public void finalizedCommitMessage(CommitMessage commitMessage) { this.lastFinalizedBlockHash = commitMessage.getVote().getBlockHash(); this.lastFinalizedBlockNumber = commitMessage.getVote().getBlockNumber(); - BlockState.getInstance().setFinalizedHash(commitMessage.getVote().getBlockHash(), commitMessage.getRoundNumber(), commitMessage.getSetId()); + if (BlockState.getInstance().isInitialized()) { + BlockState.getInstance().setFinalizedHash(commitMessage.getVote().getBlockHash(), commitMessage.getRoundNumber(), commitMessage.getSetId()); + } } } catch (HeaderNotFoundException ignored) { diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java index 68a8c05c6..0b8e7d724 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java @@ -1,6 +1,5 @@ package com.limechain.network.protocol.blockannounce; -import com.limechain.exception.global.RuntimeCodeException; import com.limechain.network.ConnectionManager; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; @@ -8,7 +7,6 @@ import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleWriter; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleReader; import com.limechain.network.protocol.warp.dto.BlockHeader; -import com.limechain.storage.block.BlockState; import com.limechain.sync.warpsync.WarpSyncState; import io.emeraldpay.polkaj.scale.ScaleCodecReader; import io.emeraldpay.polkaj.scale.ScaleCodecWriter; @@ -22,10 +20,8 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; -import java.lang.reflect.Field; import java.util.Arrays; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; @@ -140,12 +136,6 @@ void receiveBlockAnnounceWhenConnectedShouldSyncMessage() throws IllegalAccessEx when(stream.remotePeerId()).thenReturn(peerId); when(connectionManager.isBlockAnnounceConnected(peerId)).thenReturn(true); - BlockState blockState = BlockState.getInstance(); - - Field initializedField = BlockState.class.getDeclaredField("initialized"); - initializedField.setAccessible(true); - initializedField.set(blockState, true); - try (MockedConstruction readerMock = mockConstruction(ScaleCodecReader.class, (mock, context) -> when(mock.read(any(BlockAnnounceMessageScaleReader.class))) .thenReturn(blockAnnounceMessage)) @@ -155,21 +145,4 @@ void receiveBlockAnnounceWhenConnectedShouldSyncMessage() throws IllegalAccessEx verify(warpSyncState).syncBlockAnnounce(blockAnnounceMessage); } } - - @Test - void receiveBlockAnnounceWithoutInitializedBlockStateShouldThrowException() throws IllegalAccessException, NoSuchFieldException { - byte[] message = new byte[] { 1, 2, 3 }; - BlockAnnounceMessage blockAnnounceMessage = mock(BlockAnnounceMessage.class); - when(blockAnnounceMessage.getHeader()).thenReturn(mock(BlockHeader.class)); - when(stream.remotePeerId()).thenReturn(peerId); - when(connectionManager.isBlockAnnounceConnected(peerId)).thenReturn(true); - - try (MockedConstruction readerMock = mockConstruction(ScaleCodecReader.class, - (mock, context) -> when(mock.read(any(BlockAnnounceMessageScaleReader.class))) - .thenReturn(blockAnnounceMessage)) - ) { - assertThrows(IllegalStateException.class, () -> blockAnnounceEngine.receiveRequest(message, stream)); - verify(warpSyncState).syncBlockAnnounce(blockAnnounceMessage); - } - } } \ No newline at end of file From e55ff8e795ad834a7e00d79526b20935850b7db2 Mon Sep 17 00:00:00 2001 From: Georgi Grigorov Date: Mon, 9 Sep 2024 17:17:52 +0300 Subject: [PATCH 12/12] Revert changes made to BlockAnnounceEngineTest --- .../protocol/blockannounce/BlockAnnounceEngineTest.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java index 0b8e7d724..b1c8d5e59 100644 --- a/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngineTest.java @@ -24,7 +24,12 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; @SuppressWarnings("unused") @ExtendWith(MockitoExtension.class) @@ -129,7 +134,7 @@ void receiveBlockAnnounceWhenConnectedShouldUpdatePeer() { } @Test - void receiveBlockAnnounceWhenConnectedShouldSyncMessage() throws IllegalAccessException, NoSuchFieldException { + void receiveBlockAnnounceWhenConnectedShouldSyncMessage() { byte[] message = new byte[] { 1, 2, 3 }; BlockAnnounceMessage blockAnnounceMessage = mock(BlockAnnounceMessage.class); when(blockAnnounceMessage.getHeader()).thenReturn(mock(BlockHeader.class));