From dd85b16dabcca12c80052ec8e91e7889d38c7840 Mon Sep 17 00:00:00 2001 From: Yordan Atanasov Date: Mon, 20 Jan 2025 10:46:20 +0200 Subject: [PATCH] feat: Add queue feature for block handling. --- .../java/com/limechain/babe/BabeService.java | 2 +- .../com/limechain/client/LightClient.java | 2 - .../blockannounce/BlockAnnounceEngine.java | 2 +- .../protocol/grandpa/GrandpaEngine.java | 13 +- .../com/limechain/state/AbstractState.java | 2 +- .../limechain/storage/block/BlockHandler.java | 182 +++++++++++++----- .../java/com/limechain/sync/SyncService.java | 12 +- .../sync/fullsync/FullSyncMachine.java | 5 +- .../sync/warpsync/WarpSyncState.java | 2 +- 9 files changed, 164 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/limechain/babe/BabeService.java b/src/main/java/com/limechain/babe/BabeService.java index ba2cc742..aa0da6ad 100644 --- a/src/main/java/com/limechain/babe/BabeService.java +++ b/src/main/java/com/limechain/babe/BabeService.java @@ -102,7 +102,7 @@ private void handleSlot(Slot slot, BabePreDigest preDigest) { return; } - blockHandler.handleProducedBlock(block); + blockHandler.handleProduced(block); } private Block produceBlock(BlockHeader parentHeader, Slot slot, BabePreDigest preDigest) { diff --git a/src/main/java/com/limechain/client/LightClient.java b/src/main/java/com/limechain/client/LightClient.java index 3991f3b9..f7724ab2 100644 --- a/src/main/java/com/limechain/client/LightClient.java +++ b/src/main/java/com/limechain/client/LightClient.java @@ -2,7 +2,6 @@ import com.limechain.network.NetworkService; import com.limechain.rpc.server.AppBean; -import com.limechain.storage.block.state.BlockState; import com.limechain.sync.SyncService; import com.limechain.sync.state.SyncState; @@ -20,7 +19,6 @@ public LightClient() { Objects.requireNonNull(AppBean.getBean(NetworkService.class)), Objects.requireNonNull(AppBean.getBean(SyncService.class))), List.of( - Objects.requireNonNull(AppBean.getBean(BlockState.class)), Objects.requireNonNull(AppBean.getBean(SyncState.class)))); } } diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index b23bd1d9..26a5d854 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -95,7 +95,7 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) { if (AppBean.getBean(BlockState.class).isInitialized()) { //TODO Network improvements: Block requests should be sent to the peer that announced the block itself. - blockHandler.handleBlockHeader(Instant.now(), announce.getHeader(), peerId); + blockHandler.handleAnnounced(announce.getHeader(), Instant.now(), peerId); } } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java index 49bcedb6..fe357003 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java @@ -17,6 +17,8 @@ import com.limechain.network.protocol.grandpa.messages.vote.VoteMessageScaleReader; import com.limechain.network.protocol.message.ProtocolMessageBuilder; import com.limechain.rpc.server.AppBean; +import com.limechain.state.AbstractState; +import com.limechain.sync.SyncMode; import com.limechain.sync.warpsync.WarpSyncState; import io.emeraldpay.polkaj.scale.ScaleCodecReader; import io.emeraldpay.polkaj.scale.ScaleCodecWriter; @@ -38,13 +40,15 @@ public class GrandpaEngine { private static final int HANDSHAKE_LENGTH = 1; + private final WarpSyncState warpSyncState; + protected ConnectionManager connectionManager; - protected WarpSyncState warpSyncState; protected BlockAnnounceHandshakeBuilder handshakeBuilder; public GrandpaEngine() { - connectionManager = ConnectionManager.getInstance(); warpSyncState = AppBean.getBean(WarpSyncState.class); + + connectionManager = ConnectionManager.getInstance(); handshakeBuilder = new BlockAnnounceHandshakeBuilder(); } @@ -101,6 +105,11 @@ private void handleResponderStreamMessage(byte[] message, GrandpaMessageType mes return; } + if (!AbstractState.getSyncMode().equals(SyncMode.HEAD)) { + log.fine("Skipping grandpa message before we reach head of chain."); + return; + } + switch (messageType) { case HANDSHAKE -> handleHandshake(message, peerId, stream); case VOTE -> handleVoteMessage(message, peerId); diff --git a/src/main/java/com/limechain/state/AbstractState.java b/src/main/java/com/limechain/state/AbstractState.java index 440b3432..f937ceb9 100644 --- a/src/main/java/com/limechain/state/AbstractState.java +++ b/src/main/java/com/limechain/state/AbstractState.java @@ -12,7 +12,7 @@ public abstract class AbstractState implements ServiceState { protected boolean initialized; public static void setSyncMode(SyncMode mode) { - if (syncMode.ordinal() > mode.ordinal()) { + if (syncMode != null && syncMode.ordinal() > mode.ordinal()) { throw new IllegalStateException(mode + " mode precedes " + syncMode); } diff --git a/src/main/java/com/limechain/storage/block/BlockHandler.java b/src/main/java/com/limechain/storage/block/BlockHandler.java index 372f5245..55f47591 100644 --- a/src/main/java/com/limechain/storage/block/BlockHandler.java +++ b/src/main/java/com/limechain/storage/block/BlockHandler.java @@ -2,9 +2,12 @@ import com.limechain.babe.BlockProductionVerifier; import com.limechain.babe.state.EpochState; +import com.limechain.config.HostConfig; +import com.limechain.exception.storage.BlockStorageGenericException; import com.limechain.grandpa.state.RoundState; import com.limechain.network.PeerMessageCoordinator; import com.limechain.network.PeerRequester; +import com.limechain.network.protocol.blockannounce.NodeRole; import com.limechain.network.protocol.message.ProtocolMessageBuilder; import com.limechain.network.protocol.sync.BlockRequestField; import com.limechain.network.protocol.warp.DigestHelper; @@ -12,58 +15,123 @@ import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.runtime.Runtime; import com.limechain.runtime.RuntimeBuilder; +import com.limechain.state.AbstractState; import com.limechain.storage.block.state.BlockState; +import com.limechain.sync.SyncMode; +import com.limechain.sync.state.SyncState; import com.limechain.transaction.TransactionProcessor; import com.limechain.utils.async.AsyncExecutor; +import io.emeraldpay.polkaj.types.Hash256; import io.libp2p.core.PeerId; import lombok.extern.java.Log; +import org.javatuples.Pair; import org.springframework.stereotype.Component; import java.time.Instant; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.concurrent.CompletableFuture; -@Component @Log +@Component public class BlockHandler { private final BlockState blockState; private final EpochState epochState; private final RoundState roundState; + private final SyncState syncState; private final PeerRequester requester; private final PeerMessageCoordinator messageCoordinator; private final RuntimeBuilder builder; - private final AsyncExecutor asyncExecutor; + private final HostConfig hostConfig; private final TransactionProcessor transactionProcessor; private final BlockProductionVerifier verifier; + private final AsyncExecutor asyncExecutor; + + private final HashMap blockHeaders; + private final ArrayDeque> pendingBlocksQueue; + public BlockHandler(EpochState epochState, BlockState blockState, + RoundState roundState, + SyncState syncState, PeerRequester requester, RuntimeBuilder builder, + HostConfig hostConfig, TransactionProcessor transactionProcessor, - PeerMessageCoordinator messageCoordinator, - RoundState roundState) { + PeerMessageCoordinator messageCoordinator) { this.epochState = epochState; this.blockState = blockState; + this.roundState = roundState; + this.syncState = syncState; + this.requester = requester; this.messageCoordinator = messageCoordinator; + this.builder = builder; + this.hostConfig = hostConfig; this.transactionProcessor = transactionProcessor; this.verifier = new BlockProductionVerifier(); + asyncExecutor = AsyncExecutor.withPoolSize(10); - this.roundState = roundState; + blockHeaders = new HashMap<>(); + pendingBlocksQueue = new ArrayDeque<>(); } - public synchronized void handleBlockHeader(Instant arrivalTime, BlockHeader header, PeerId excluding) { + public synchronized void handleAnnounced(BlockHeader header, Instant arrivalTime, PeerId peerId) { + + if (blockHeaders.containsKey(header.getHash()) || blockState.hasHeader(header.getHash())) { + log.fine("Skipping announced block: " + header.getBlockNumber() + " " + header.getHash()); + return; + } + + if (!AbstractState.getSyncMode().equals(SyncMode.HEAD)) { + addBlockToQueue(header, arrivalTime); + return; + } + + processPendingBlocksFromQueue(); + + Block block = requestBlock(header); + processBlock(block, arrivalTime); + + messageCoordinator.sendBlockAnnounceMessageExcludingPeer( + ProtocolMessageBuilder.buildBlockAnnounceMessage( + header, header.getHash().equals(blockState.bestBlockHash())), + peerId); + } + + public synchronized void handleProduced(Block block) { + + addBlockToTree(block, Instant.now()); + messageCoordinator.sendBlockAnnounceMessageExcludingPeer( + ProtocolMessageBuilder.buildBlockAnnounceMessage(block.getHeader(), true), + null); + } + + private void processBlock(Block block, Instant arrivalTime) { + addBlockToTree(block, arrivalTime); + + if (!hostConfig.getNodeRole().equals(NodeRole.LIGHT)) { + verifyAndExecuteBlock(block); + } + } + + private void verifyAndExecuteBlock(Block block) { + try { + BlockHeader header = block.getHeader(); + Runtime runtime = blockState.getRuntime(header.getParentHash()); Runtime newRuntime = builder.copyRuntime(runtime); - if (epochState.isInitialized() && !verifier.isAuthorshipValid(newRuntime, + if (!verifier.isAuthorshipValid(newRuntime, header, epochState.getCurrentEpochData(), epochState.getCurrentEpochDescriptor(), @@ -71,63 +139,87 @@ public synchronized void handleBlockHeader(Instant arrivalTime, BlockHeader head return; } - if (blockState.hasHeader(header.getHash())) { - log.fine("Skipping announced block: " + header.getBlockNumber() + " " + header.getHash()); - return; - } - - CompletableFuture> responseFuture = requester.requestBlocks( - BlockRequestField.ALL, header.getHash(), 1); - - List blocks = responseFuture.join(); - while (blocks.isEmpty()) { - blocks = requester.requestBlocks( - BlockRequestField.ALL, header.getHash(), 1).join(); - } - - Block block = blocks.getFirst(); - newRuntime.executeBlock(block); log.fine(String.format("Executed block No: %s with hash: %s.", block.getHeader().getBlockNumber(), header.getHash())); blockState.storeRuntime(header.getHash(), runtime); - handleBlock(block, arrivalTime); - - messageCoordinator.sendBlockAnnounceMessageExcludingPeer( - ProtocolMessageBuilder.buildBlockAnnounceMessage( - block.getHeader(), block.getHeader().getHash().equals(blockState.bestBlockHash())), - excluding); + asyncExecutor.executeAndForget(() -> transactionProcessor.maintainTransactionPool(block)); } catch (Exception e) { log.warning("Error while importing announced block: " + e); } } - public void handleProducedBlock(Block block) { - handleBlock(block, Instant.now()); - messageCoordinator.sendBlockAnnounceMessageExcludingPeer( - ProtocolMessageBuilder.buildBlockAnnounceMessage(block.getHeader(), true), - null); - } + private void addBlockToTree(Block block, Instant arrivalTime) { - private void handleBlock(Block block, Instant arrivalTime) { BlockHeader header = block.getHeader(); blockState.addBlockWithArrivalTime(block, arrivalTime); log.fine(String.format("Added block No: %s with hash: %s to block tree.", block.getHeader().getBlockNumber(), header.getHash())); - DigestHelper.getBabeConsensusMessage(header.getDigest()) - .ifPresent(cm -> { - epochState.updateNextEpochConfig(cm); - log.fine(String.format("Updated epoch block config: %s", cm.getFormat().toString())); - }); + if (epochState.isInitialized()) { + asyncExecutor.executeAndForget(() -> DigestHelper.getBabeConsensusMessage(header.getDigest()) + .ifPresent(cm -> { + epochState.updateNextEpochConfig(cm); + log.fine(String.format("Updated epoch block config: %s", cm.getFormat().toString())); + })); + } + + if (roundState.isInitialized()) { + asyncExecutor.executeAndForget(() -> DigestHelper.getGrandpaConsensusMessage(header.getDigest()) + .ifPresent(cm -> + roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber()) + )); + + roundState.handleAuthoritySetChange(header.getBlockNumber()); + } + } + + private void addBlockToQueue(BlockHeader blockHeader, Instant arrivalTime) { - DigestHelper.getGrandpaConsensusMessage(header.getDigest()) - .ifPresent(cm -> roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber())); + blockHeaders.put(blockHeader.getHash(), blockHeader); - roundState.handleAuthoritySetChange(header.getBlockNumber()); + asyncExecutor.executeAndForget(() -> { + Block block = requestBlock(blockHeader); + pendingBlocksQueue.add(Pair.with(arrivalTime, block)); + log.fine("Added block to queue " + block.getHeader().getBlockNumber() + " " + block.getHeader().getHash()); + }); + } + + private void processPendingBlocksFromQueue() { + + while (!pendingBlocksQueue.isEmpty()) { + var currentPair = pendingBlocksQueue.poll(); + var block = currentPair.getValue1(); + var arrivalTime = currentPair.getValue0(); + + blockHeaders.remove(block.getHeader().getHash()); + + if (block.getHeader().getBlockNumber().compareTo(syncState.getLastFinalizedBlockNumber()) <= 0) { + continue; + } + + try { + processBlock(block, arrivalTime); + } catch (BlockStorageGenericException ex) { + log.fine(String.format("[%s] %s", block.getHeader().getHash().toString(), ex.getMessage())); + } + } + } + + private Block requestBlock(BlockHeader header) { + + List blocks = new ArrayList<>(); + + while (blocks.isEmpty()) { + CompletableFuture> responseFuture = requester.requestBlocks( + BlockRequestField.ALL, header.getHash(), 1); + + blocks = responseFuture.join(); + } - asyncExecutor.executeAndForget(() -> transactionProcessor.maintainTransactionPool(block)); + log.fine("Request successful " + blocks.getFirst().getHeader().getHash()); + return blocks.getFirst(); } } \ No newline at end of file diff --git a/src/main/java/com/limechain/sync/SyncService.java b/src/main/java/com/limechain/sync/SyncService.java index d6dce5ca..62dd3528 100644 --- a/src/main/java/com/limechain/sync/SyncService.java +++ b/src/main/java/com/limechain/sync/SyncService.java @@ -22,14 +22,20 @@ public class SyncService implements NodeService { @Override public void start() { + SyncMode initSyncMode = arguments.syncMode(); + AbstractState.setSyncMode(initSyncMode); + switch (NodeRole.fromString(arguments.nodeRole())) { case LIGHT -> { - warpSyncMachine.onFinish(() -> AbstractState.setSyncMode(SyncMode.HEAD), - messageCoordinator::handshakeBootNodes); + warpSyncMachine.onFinish(() -> { + AbstractState.setSyncMode(SyncMode.HEAD); + messageCoordinator.handshakeBootNodes(); + messageCoordinator.handshakePeers(); + }); warpSyncMachine.start(); } case FULL, AUTHORING -> { - switch (arguments.syncMode()) { + switch (initSyncMode) { case FULL -> fullSyncMachine.start(); case WARP -> { warpSyncMachine.onFinish(() -> AbstractState.setSyncMode(SyncMode.FULL), diff --git a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java index ccdce72f..6dc26d83 100644 --- a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java +++ b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java @@ -115,6 +115,9 @@ public void start() { .add(BigInteger.ONE) .intValueExact(); + messageCoordinator.handshakeBootNodes(); + messageCoordinator.handshakePeers(); + int blocksToFetch = 100; List receivedBlocks = requester.requestBlocks(BlockRequestField.ALL, startNumber, blocksToFetch).join(); @@ -133,8 +136,6 @@ private void finishFullSync() { initializeStates(); AbstractState.setSyncMode(SyncMode.HEAD); - messageCoordinator.handshakeBootNodes(); - messageCoordinator.handshakePeers(); } private void initializeStates() { diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java index 1bb6baae..b0a4b725 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java @@ -21,8 +21,8 @@ import com.limechain.runtime.RuntimeBuilder; import com.limechain.storage.DBConstants; import com.limechain.storage.KVRepository; -import com.limechain.sync.state.SyncState; import com.limechain.sync.JustificationVerifier; +import com.limechain.sync.state.SyncState; import com.limechain.trie.decoded.Trie; import com.limechain.trie.decoded.TrieVerifier; import com.limechain.utils.LittleEndianUtils;