diff --git a/src/main/java/com/limechain/client/FullNode.java b/src/main/java/com/limechain/client/FullNode.java index 117f201f0..be98e517e 100644 --- a/src/main/java/com/limechain/client/FullNode.java +++ b/src/main/java/com/limechain/client/FullNode.java @@ -59,7 +59,7 @@ public void start() { switch (args.syncMode()) { case FULL -> fullSyncMachine.start(); case WARP -> { - warpSyncMachine.onFinish(() -> fullSyncMachine.start()); + warpSyncMachine.onFinish(fullSyncMachine::start); warpSyncMachine.start(); } default -> throw new IllegalStateException("Unexpected value: " + args.syncMode()); diff --git a/src/main/java/com/limechain/network/Network.java b/src/main/java/com/limechain/network/Network.java index 3a347134c..fa05159d6 100644 --- a/src/main/java/com/limechain/network/Network.java +++ b/src/main/java/com/limechain/network/Network.java @@ -120,16 +120,16 @@ private void initializeProtocols(ChainService chainService, GenesisBlockHash gen String pingProtocol = ProtocolUtils.PING_PROTOCOL; String chainId = chainService.getChainSpec().getProtocolId(); - String protocolId = cliArgs.noLegacyProtocols() - ? StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString()) - : chainId; + boolean legacyProtocol = !cliArgs.noLegacyProtocols(); + String protocolId = legacyProtocol ? chainId : + StringUtils.remove0xPrefix(genesisBlockHash.getGenesisHash().toString()); String kadProtocolId = ProtocolUtils.getKadProtocol(chainId); String warpProtocolId = ProtocolUtils.getWarpSyncProtocol(protocolId); String lightProtocolId = ProtocolUtils.getLightMessageProtocol(protocolId); String syncProtocolId = ProtocolUtils.getSyncProtocol(protocolId); String stateProtocolId = ProtocolUtils.getStateProtocol(protocolId); String blockAnnounceProtocolId = ProtocolUtils.getBlockAnnounceProtocol(protocolId); - String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId); + String grandpaProtocolId = ProtocolUtils.getGrandpaProtocol(protocolId, legacyProtocol); String transactionsProtocolId = ProtocolUtils.getTransactionsProtocol(protocolId); kademliaService = new KademliaService(kadProtocolId, hostId, isLocalEnabled, clientMode); diff --git a/src/main/java/com/limechain/network/ProtocolUtils.java b/src/main/java/com/limechain/network/ProtocolUtils.java index 26b58cd33..acd865963 100644 --- a/src/main/java/com/limechain/network/ProtocolUtils.java +++ b/src/main/java/com/limechain/network/ProtocolUtils.java @@ -31,13 +31,8 @@ public static String getKadProtocol(String chainId) { return String.format("/%s/kad", chainId); } - public static String getGrandpaProtocol(String chainId) { - return String.format("/%s/grandpa/1", grandpaProtocolChain(chainId)); - } - - //TODO: figure out a more elegant solution - private static String grandpaProtocolChain(String chainId) { - return chainId.equals("dot") ? "paritytech" : chainId; + public static String getGrandpaProtocol(String chainId, boolean legacyProtocol) { + return String.format("/%s/grandpa/1", legacyProtocol ? "paritytech" : chainId); } public static String getTransactionsProtocol(String chainId) { diff --git a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java index 6311cf219..ee4a2e26d 100644 --- a/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java +++ b/src/main/java/com/limechain/network/protocol/blockannounce/BlockAnnounceEngine.java @@ -1,7 +1,6 @@ package com.limechain.network.protocol.blockannounce; import com.limechain.exception.scale.ScaleEncodingException; -import com.limechain.exception.storage.BlockNodeNotFoundException; import com.limechain.network.ConnectionManager; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; @@ -9,8 +8,6 @@ import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleReader; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceHandshakeScaleWriter; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleReader; -import com.limechain.network.protocol.warp.dto.Block; -import com.limechain.network.protocol.warp.dto.BlockBody; import com.limechain.rpc.server.AppBean; import com.limechain.storage.block.BlockState; import com.limechain.sync.warpsync.WarpSyncState; @@ -24,13 +21,14 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.logging.Level; @Log @AllArgsConstructor(access = AccessLevel.PROTECTED) public class BlockAnnounceEngine { + public static final int HANDSHAKE_LENGTH = 69; + protected ConnectionManager connectionManager; protected WarpSyncState warpSyncState; protected BlockAnnounceHandshakeBuilder handshakeBuilder; @@ -74,7 +72,7 @@ private void handleHandshake(byte[] msg, PeerId peerId, Stream stream, boolean c connectionManager.addBlockAnnounceStream(stream); connectionManager.updatePeer(peerId, handshake); log.log(Level.INFO, "Received handshake from " + peerId + "\n" + - handshake); + handshake); writeHandshakeToStream(stream, peerId); } } @@ -85,19 +83,13 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) { connectionManager.updatePeer(peerId, announce); warpSyncState.syncBlockAnnounce(announce); log.log(Level.FINE, "Received block announce for block #" + announce.getHeader().getBlockNumber() + - " from " + peerId + - " with hash:0x" + announce.getHeader().getHash() + - " parentHash:" + announce.getHeader().getParentHash() + - " stateRoot:" + announce.getHeader().getStateRoot()); + " from " + peerId + + " with hash:0x" + announce.getHeader().getHash() + + " parentHash:" + announce.getHeader().getParentHash() + + " stateRoot:" + announce.getHeader().getStateRoot()); if (BlockState.getInstance().isInitialized()) { - try { - BlockState.getInstance().addBlock(new Block(announce.getHeader(), new BlockBody(new ArrayList<>()))); - } catch (BlockNodeNotFoundException ignored) { - // Currently we ignore this exception, because our syncing strategy as full node is not implemented yet. - // And thus when we receive a block announce and try to add it in the BlockState we will get this - // exception because the parent block of the received one is not found in the BlockState. - } + BlockState.getInstance().addBlockToBlockTree(announce.getHeader()); } } diff --git a/src/main/java/com/limechain/storage/block/BlockState.java b/src/main/java/com/limechain/storage/block/BlockState.java index bcacbca90..c83ef5609 100644 --- a/src/main/java/com/limechain/storage/block/BlockState.java +++ b/src/main/java/com/limechain/storage/block/BlockState.java @@ -16,17 +16,20 @@ import com.limechain.runtime.Runtime; import com.limechain.storage.DBConstants; import com.limechain.storage.KVRepository; +import com.limechain.storage.block.tree.BlockNode; import com.limechain.storage.block.tree.BlockTree; import com.limechain.utils.scale.ScaleUtils; import io.emeraldpay.polkaj.types.Hash256; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import lombok.extern.java.Log; import org.javatuples.Pair; import org.springframework.util.SerializationUtils; import java.math.BigInteger; import java.time.Instant; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -53,6 +56,10 @@ public class BlockState { private Hash256 lastFinalized; @Getter private boolean initialized; + @Setter + private boolean fullSyncFinished; + @Getter + private final ArrayDeque> pendingBlocksQueue = new ArrayDeque<>(); /** * Initializes the BlockState instance from genesis @@ -97,11 +104,21 @@ public void initialize(final KVRepository repository) { this.genesisHash = getHashByNumberFromDb(BigInteger.ZERO); final BlockHeader lastHeader = getHighestFinalizedHeader(); - final Hash256 headerHash = lastHeader.getHash(); - this.lastFinalized = headerHash; + this.lastFinalized = lastHeader.getHash(); this.blockTree = new BlockTree(lastHeader); } + public void initializeAfterWarpSync(Hash256 lastFinalizedBlockHash, BigInteger lastFinalizedBlockNumber) { + BlockNode parentBlock = new BlockNode( + lastFinalizedBlockHash, + null, + lastFinalizedBlockNumber.longValue() + ); + + this.blockTree = new BlockTree(parentBlock); + this.lastFinalized = lastFinalizedBlockHash; + } + /** * Check if the hash is part of the unfinalized blocks in-memory or persisted in the database. * @@ -589,7 +606,7 @@ public List retrieveRangeFromDatabase(final Hash256 startHash, final Bl // Verify that we ended up with the start hash if (!Objects.equals(inLoopHash, startHash)) { throw new BlockStorageGenericException("Start hash mismatch: expected " + startHash + - ", found: " + inLoopHash); + ", found: " + inLoopHash); } return hashes; @@ -749,6 +766,8 @@ public Block getUnfinalizedBlockFromHash(final Hash256 hash) { * @throws BlockNodeNotFoundException if the block corresponding to the provided hash is not found. */ public void setFinalizedHash(final Hash256 hash, final BigInteger round, final BigInteger setId) { + if (!fullSyncFinished) return; + if (!hasHeader(hash)) { throw new BlockNodeNotFoundException("Cannot finalise unknown block " + hash); } @@ -883,8 +902,8 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { Block block = unfinalizedBlocks.get(subchainHash); if (block == null) { - throw new BlockNotFoundException("Failed to find block in unfinalized block map for hash" + - subchainHash); + throw new BlockNotFoundException("Failed to find block in unfinalized block map for hash " + + subchainHash); } setHeader(block.getHeader()); @@ -904,4 +923,52 @@ public void handleFinalizedBlock(final Hash256 currentFinalizedHash) { //TODO: If currentFinalizedHash is not equal to subchain hash, delete subchain state trie } } + + public synchronized void addBlockToBlockTree(BlockHeader blockHeader) { + if (!fullSyncFinished) { + addBlockToQueue(blockHeader); + return; + } + + processPendingBlocksFromQueue(); + + if (getPendingBlocksQueue().isEmpty()) { + try { + addBlock(new Block(blockHeader, new BlockBody(new ArrayList<>()))); + } catch (BlockStorageGenericException ex) { + log.fine(String.format("[%s] %s", blockHeader.getHash().toString(), ex.getMessage())); + } + } + } + + private void addBlockToQueue(BlockHeader blockHeader) { + var currentBlock = new Block( + blockHeader, + new BlockBody(new ArrayList<>()) + ); + + pendingBlocksQueue.add( + new Pair<>(Instant.now(), currentBlock) + ); + } + + private void processPendingBlocksFromQueue() { + var rootBlockNumber = BigInteger.valueOf(blockTree.getRoot().getNumber()); + + while (!pendingBlocksQueue.isEmpty()) { + var currentPair = pendingBlocksQueue.poll(); + var block = currentPair.getValue1(); + var arrivalTime = currentPair.getValue0(); + + if (block.getHeader().getBlockNumber().compareTo(rootBlockNumber) <= 0) { + continue; + } + + try { + addBlockWithArrivalTime(block, arrivalTime); + } catch (BlockStorageGenericException ex) { + log.fine(String.format("[%s] %s", block.getHeader().getHash().toString(), ex.getMessage())); + } + } + } } diff --git a/src/main/java/com/limechain/storage/block/SyncState.java b/src/main/java/com/limechain/storage/block/SyncState.java index 320d17720..aa2123421 100644 --- a/src/main/java/com/limechain/storage/block/SyncState.java +++ b/src/main/java/com/limechain/storage/block/SyncState.java @@ -76,7 +76,12 @@ public void finalizedCommitMessage(CommitMessage commitMessage) { this.stateRoot = blockByHash.getHeader().getStateRoot(); this.lastFinalizedBlockHash = commitMessage.getVote().getBlockHash(); this.lastFinalizedBlockNumber = commitMessage.getVote().getBlockNumber(); + + if (BlockState.getInstance().isInitialized()) { + BlockState.getInstance().setFinalizedHash(commitMessage.getVote().getBlockHash(), commitMessage.getRoundNumber(), commitMessage.getSetId()); + } } + } catch (HeaderNotFoundException ignored) { log.fine("Received commit message for a block that is not in the block store"); } diff --git a/src/main/java/com/limechain/storage/block/tree/BlockTree.java b/src/main/java/com/limechain/storage/block/tree/BlockTree.java index e148e7224..2bac7cc04 100644 --- a/src/main/java/com/limechain/storage/block/tree/BlockTree.java +++ b/src/main/java/com/limechain/storage/block/tree/BlockTree.java @@ -178,10 +178,6 @@ public List rangeInMemory(final Hash256 startHash, final Hash256 endHas throw new BlockNodeNotFoundException("Start node not found"); } - if (startBlockNode.getNumber() > endBlockNode.getNumber()) { - throw new BlockStorageGenericException("Start is greater than end"); - } - return accumulateHashesInDescendingOrder(endBlockNode, startBlockNode); } @@ -196,7 +192,7 @@ public List rangeInMemory(final Hash256 startHash, final Hash256 endHas */ public List accumulateHashesInDescendingOrder(final BlockNode endNode, final BlockNode startNode) { if (startNode.getNumber() > endNode.getNumber()) { - throw new IllegalArgumentException("Start is greater than end"); + throw new BlockStorageGenericException("Start is greater than end"); } int blocksInRange = (int) (endNode.getNumber() - startNode.getNumber()); @@ -205,7 +201,7 @@ public List accumulateHashesInDescendingOrder(final BlockNode endNode, BlockNode tempNode = endNode; for (int position = blocksInRange - 1; position >= 0; position--) { hashes.add(tempNode.getHash()); - tempNode = endNode.getParent(); + tempNode = tempNode.getParent(); if (tempNode == null) { throw new BlockStorageGenericException("End node is null"); diff --git a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java index 1b44907ce..04605a593 100644 --- a/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java +++ b/src/main/java/com/limechain/sync/fullsync/FullSyncMachine.java @@ -102,6 +102,8 @@ public void start() { startNumber += blocksToFetch; receivedBlocks = requestBlocks(startNumber, blocksToFetch); } + + blockState.setFullSyncFinished(true); } private TrieStructure loadStateAtBlockFromPeer(Hash256 lastFinalizedBlockHash) { diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java index a3eaecdbf..a241b1aa5 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java @@ -5,6 +5,7 @@ import com.limechain.chain.lightsyncstate.LightSyncState; import com.limechain.network.Network; import com.limechain.network.protocol.warp.dto.WarpSyncFragment; +import com.limechain.storage.block.BlockState; import com.limechain.storage.block.SyncState; import com.limechain.sync.warpsync.action.FinishedAction; import com.limechain.sync.warpsync.action.RequestFragmentsAction; @@ -101,6 +102,12 @@ private void finishWarpSync() { this.warpState.setWarpSyncFinished(true); this.networkService.handshakeBootNodes(); this.syncState.persistState(); + + BlockState.getInstance().initializeAfterWarpSync( + syncState.getLastFinalizedBlockHash(), + syncState.getLastFinalizedBlockNumber() + ); + log.info("Warp sync finished."); this.onFinishCallbacks.forEach(executor::submit); }