From 23d716d971f463deb31aa40e2a27989dc143310e Mon Sep 17 00:00:00 2001 From: Oleksandr Date: Fri, 17 Jan 2025 16:05:10 +0200 Subject: [PATCH] fixed conflicts --- .../com/limechain/grandpa/GrandpaService.java | 4 +- .../grandpa/state/GrandpaSetState.java | 73 +++++++++++++- .../limechain/storage/block/BlockHandler.java | 12 +-- .../sync/warpsync/WarpSyncState.java | 95 ++----------------- .../action/VerifyJustificationAction.java | 10 +- 5 files changed, 92 insertions(+), 102 deletions(-) diff --git a/src/main/java/com/limechain/grandpa/GrandpaService.java b/src/main/java/com/limechain/grandpa/GrandpaService.java index 7533ad24..68063b6a 100644 --- a/src/main/java/com/limechain/grandpa/GrandpaService.java +++ b/src/main/java/com/limechain/grandpa/GrandpaService.java @@ -122,7 +122,9 @@ public Vote getGrandpaGhost(GrandpaRound grandpaRound) { * @return the best pre-voted block */ public Vote getBestPreVoteCandidate(GrandpaRound grandpaRound) { - Vote previousBestFinalCandidate = grandpaRound.getPrevious().getBestFinalCandidate(); + Vote previousBestFinalCandidate = grandpaRound.getPrevious() != null + ? grandpaRound.getPrevious().getBestFinalCandidate() + : new Vote(null, BigInteger.ZERO); Vote currentVote = getGrandpaGhost(grandpaRound); SignedVote primaryVote = grandpaRound.getPrimaryVote(); diff --git a/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java b/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java index 8237467a..54fd5212 100644 --- a/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java +++ b/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java @@ -5,6 +5,7 @@ import com.limechain.exception.grandpa.GrandpaGenericException; import com.limechain.network.protocol.grandpa.messages.catchup.res.SignedVote; import com.limechain.network.protocol.grandpa.messages.commit.Vote; +import com.limechain.network.protocol.grandpa.messages.consensus.GrandpaConsensusMessage; import com.limechain.network.protocol.grandpa.messages.vote.SignedMessage; import com.limechain.network.protocol.grandpa.messages.vote.Subround; import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage; @@ -12,24 +13,30 @@ import com.limechain.storage.DBConstants; import com.limechain.storage.KVRepository; import com.limechain.storage.StateUtil; +import com.limechain.sync.warpsync.dto.AuthoritySetChange; +import com.limechain.sync.warpsync.dto.ForcedAuthoritySetChange; +import com.limechain.sync.warpsync.dto.ScheduledAuthoritySetChange; import io.emeraldpay.polkaj.types.Hash256; import io.libp2p.core.crypto.PubKey; -import jakarta.annotation.PostConstruct; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; +import lombok.extern.java.Log; import java.math.BigInteger; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; +import java.util.logging.Level; /** * Represents the state information for the current round and authorities that are needed * for block finalization with GRANDPA. * Note: Intended for use only when the host is configured as an Authoring Node. */ +@Log @Getter @Setter //TODO: remove it when initialize() method is implemented @RequiredArgsConstructor @@ -42,8 +49,8 @@ public class GrandpaSetState { private BigInteger setId; private RoundCache roundCache; + private final PriorityQueue authoritySetChanges = new PriorityQueue<>(AuthoritySetChange.getComparator()); - @PostConstruct public void initialize() { loadPersistedState(); roundCache = AppBean.getBean(RoundCache.class); @@ -131,9 +138,14 @@ public void persistState() { savePrevotes(roundCache.getLatestRoundNumber(setId)); } - public BigInteger incrementSetId() { + public void startNewSet(List authorities) { this.setId = setId.add(BigInteger.ONE); - return setId; + GrandpaRound grandpaRound = new GrandpaRound(); + grandpaRound.setRoundNumber(BigInteger.ZERO); + roundCache.addRound(setId, grandpaRound); + this.authorities = authorities; + + log.log(Level.INFO, "Successfully transitioned to authority set id: " + setId); } public void setLightSyncState(LightSyncState initState) { @@ -141,6 +153,59 @@ public void setLightSyncState(LightSyncState initState) { this.authorities = Arrays.asList(initState.getGrandpaAuthoritySet().getCurrentAuthorities()); } + /** + * Apply scheduled or forced authority set changes from the queue if present + * + * @param blockNumber required to determine if it's time to apply the change + */ + public boolean handleAuthoritySetChange(BigInteger blockNumber) { + AuthoritySetChange changeSetData = authoritySetChanges.peek(); + + boolean updated = false; + while (changeSetData != null) { + + if (changeSetData.getApplicationBlockNumber().compareTo(blockNumber) > 0) { + break; + } + + startNewSet(changeSetData.getAuthorities()); + authoritySetChanges.poll(); + updated = true; + + changeSetData = authoritySetChanges.peek(); + } + + return updated; + } + + public void handleGrandpaConsensusMessage(GrandpaConsensusMessage consensusMessage, BigInteger currentBlockNumber) { + switch (consensusMessage.getFormat()) { + case GRANDPA_SCHEDULED_CHANGE -> authoritySetChanges.add(new ScheduledAuthoritySetChange( + consensusMessage.getAuthorities(), + consensusMessage.getDelay(), + currentBlockNumber + )); + case GRANDPA_FORCED_CHANGE -> authoritySetChanges.add(new ForcedAuthoritySetChange( + consensusMessage.getAuthorities(), + consensusMessage.getDelay(), + consensusMessage.getAdditionalOffset(), + currentBlockNumber + )); + //TODO: Implement later + case GRANDPA_ON_DISABLED -> { + log.log(Level.SEVERE, "'ON DISABLED' grandpa message not implemented"); + } + case GRANDPA_PAUSE -> { + log.log(Level.SEVERE, "'PAUSE' grandpa message not implemented"); + } + case GRANDPA_RESUME -> { + log.log(Level.SEVERE, "'RESUME' grandpa message not implemented"); + } + } + + log.fine(String.format("Updated grandpa set config: %s", consensusMessage.getFormat().toString())); + } + public void handleVoteMessage(VoteMessage voteMessage) { BigInteger voteMessageSetId = voteMessage.getSetId(); BigInteger voteMessageRoundNumber = voteMessage.getRound(); diff --git a/src/main/java/com/limechain/storage/block/BlockHandler.java b/src/main/java/com/limechain/storage/block/BlockHandler.java index 07a35e7a..5b0e3177 100644 --- a/src/main/java/com/limechain/storage/block/BlockHandler.java +++ b/src/main/java/com/limechain/storage/block/BlockHandler.java @@ -2,7 +2,7 @@ import com.limechain.babe.BlockProductionVerifier; import com.limechain.babe.state.EpochState; -import com.limechain.grandpa.state.RoundState; +import com.limechain.grandpa.state.GrandpaSetState; import com.limechain.network.PeerMessageCoordinator; import com.limechain.network.PeerRequester; import com.limechain.network.protocol.message.ProtocolMessageBuilder; @@ -28,7 +28,7 @@ public class BlockHandler { private final BlockState blockState; private final EpochState epochState; - private final RoundState roundState; + private final GrandpaSetState grandpaSetState; private final PeerRequester requester; private final PeerMessageCoordinator messageCoordinator; @@ -43,7 +43,7 @@ public BlockHandler(EpochState epochState, RuntimeBuilder builder, TransactionProcessor transactionProcessor, PeerMessageCoordinator messageCoordinator, - RoundState roundState) { + GrandpaSetState grandpaSetState) { this.epochState = epochState; this.requester = requester; @@ -53,7 +53,7 @@ public BlockHandler(EpochState epochState, this.verifier = new BlockProductionVerifier(); blockState = BlockState.getInstance(); asyncExecutor = AsyncExecutor.withPoolSize(10); - this.roundState = roundState; + this.grandpaSetState = grandpaSetState; } public synchronized void handleBlockHeader(Instant arrivalTime, BlockHeader header, PeerId excluding) { @@ -122,9 +122,9 @@ private void handleBlock(Block block, Instant arrivalTime) { }); DigestHelper.getGrandpaConsensusMessage(header.getDigest()) - .ifPresent(cm -> roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber())); + .ifPresent(cm -> grandpaSetState.handleGrandpaConsensusMessage(cm, header.getBlockNumber())); - roundState.handleAuthoritySetChange(header.getBlockNumber()); + grandpaSetState.handleAuthoritySetChange(header.getBlockNumber()); asyncExecutor.executeAndForget(() -> transactionProcessor.maintainTransactionPool(block)); } diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java index 51b5941e..4927ed5c 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java @@ -1,6 +1,5 @@ package com.limechain.sync.warpsync; -import com.limechain.chain.lightsyncstate.Authority; import com.limechain.exception.global.RuntimeCodeException; import com.limechain.exception.trie.TrieDecoderException; import com.limechain.grandpa.state.GrandpaSetState; @@ -12,10 +11,9 @@ import com.limechain.network.protocol.lightclient.pb.LightClientMessage; import com.limechain.network.protocol.sync.BlockRequestField; import com.limechain.network.protocol.sync.pb.SyncMessage.BlockData; +import com.limechain.network.protocol.warp.DigestHelper; import com.limechain.network.protocol.warp.dto.BlockHeader; -import com.limechain.network.protocol.warp.dto.ConsensusEngine; import com.limechain.network.protocol.warp.dto.DigestType; -import com.limechain.network.protocol.warp.dto.HeaderDigest; import com.limechain.network.protocol.warp.dto.Justification; import com.limechain.network.protocol.warp.scale.reader.BlockHeaderReader; import com.limechain.network.protocol.warp.scale.reader.JustificationReader; @@ -25,10 +23,6 @@ import com.limechain.storage.KVRepository; import com.limechain.storage.block.SyncState; import com.limechain.sync.JustificationVerifier; -import com.limechain.sync.warpsync.dto.AuthoritySetChange; -import com.limechain.sync.warpsync.dto.GrandpaDigestMessageType; -import com.limechain.sync.warpsync.scale.ForcedChangeReader; -import com.limechain.sync.warpsync.scale.ScheduledChangeReader; import com.limechain.trie.decoded.Trie; import com.limechain.trie.decoded.TrieVerifier; import com.limechain.utils.LittleEndianUtils; @@ -39,14 +33,11 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.java.Log; -import org.javatuples.Pair; import java.math.BigInteger; import java.util.Arrays; -import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.PriorityQueue; import java.util.Set; import java.util.logging.Level; @@ -78,36 +69,34 @@ public class WarpSyncState { protected final RuntimeBuilder runtimeBuilder; //TODO Yordan: maybe we won't need this anymore. private final Set scheduledRuntimeUpdateBlocks; - private final PriorityQueue> scheduledAuthorityChanges; - public WarpSyncState(SyncState syncState, KVRepository db, RuntimeBuilder runtimeBuilder, PeerRequester requester, PeerMessageCoordinator messageCoordinator, - RoundState roundState) { + GrandpaSetState grandpaSetState) { this(syncState, grandpaSetState, db, runtimeBuilder, new HashSet<>(), requester, - messageCoordinator); + messageCoordinator + ); } public WarpSyncState(SyncState syncState, GrandpaSetState grandpaSetState, KVRepository db, RuntimeBuilder runtimeBuilder, Set scheduledRuntimeUpdateBlocks, - PriorityQueue> scheduledAuthorityChanges, PeerRequester requester, PeerMessageCoordinator messageCoordinator) { + this.syncState = syncState; this.grandpaSetState = grandpaSetState; this.db = db; this.runtimeBuilder = runtimeBuilder; this.scheduledRuntimeUpdateBlocks = scheduledRuntimeUpdateBlocks; - this.scheduledAuthorityChanges = scheduledAuthorityChanges; this.requester = requester; this.messageCoordinator = messageCoordinator; } @@ -280,6 +269,7 @@ public void syncNeighbourMessage(NeighbourMessage neighbourMessage, PeerId peerI } private void updateSetData(BigInteger setChangeBlock) { + List response = requester.requestBlockData( BlockRequestField.ALL, setChangeBlock.intValueExact(), @@ -295,6 +285,7 @@ private void updateSetData(BigInteger setChangeBlock) { Justification justification = new JustificationReader().read( new ScaleCodecReader(block.getJustification().toByteArray())); + boolean verified = justification != null && JustificationVerifier.verify(justification.getPrecommits(), justification.getRound()); @@ -304,7 +295,7 @@ private void updateSetData(BigInteger setChangeBlock) { syncState.finalizeHeader(header); DigestHelper.getGrandpaConsensusMessage(header.getDigest()) - .ifPresent(cm -> roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber())); + .ifPresent(cm -> grandpaSetState.handleGrandpaConsensusMessage(cm, header.getBlockNumber())); handleScheduledEvents(); } @@ -314,76 +305,10 @@ private void updateSetData(BigInteger setChangeBlock) { * Executes scheduled or forced authority changes for the last finalized block. */ public void handleScheduledEvents() { - Pair data = scheduledAuthorityChanges.peek(); - BigInteger authoritiesSetId = roundState.getSetId(); - boolean updated = false; - while (data != null) { - if (data.getValue0().compareTo(syncState.getLastFinalizedBlockNumber()) < 1) { - authoritiesSetId = roundState.incrementSetId(); - roundState.resetRound(); - roundState.setAuthorities(Arrays.asList(data.getValue1())); - scheduledAuthorityChanges.poll(); - updated = true; - } else break; - data = scheduledAuthorityChanges.peek(); - } + boolean updated = grandpaSetState.handleAuthoritySetChange(syncState.getLastFinalizedBlockNumber()); + if (warpSyncFinished && updated) { - log.log(Level.INFO, "Successfully transitioned to authority set id: " + authoritiesSetId); new Thread(messageCoordinator::sendMessagesToPeers).start(); } } - - /** - * Handles authority changes coming from a block header digest and schedules them. - * - * @param headerDigests digest of the block header - * @param blockNumber block that contains the digest - */ - public void handleAuthorityChanges(HeaderDigest[] headerDigests, BigInteger blockNumber) { - // Update authority set and set id - AuthoritySetChange authorityChanges; - for (HeaderDigest digest : headerDigests) { - if (digest.getId() == ConsensusEngine.GRANDPA) { - ScaleCodecReader reader = new ScaleCodecReader(digest.getMessage()); - GrandpaDigestMessageType type = GrandpaDigestMessageType.fromId(reader.readByte()); - - if (type == null) { - log.log(Level.SEVERE, "Could not get grandpa message type"); - throw new IllegalStateException("Unknown grandpa message type"); - } - - switch (type) { - case SCHEDULED_CHANGE -> { - ScheduledChangeReader authorityChangesReader = new ScheduledChangeReader(); - authorityChanges = authorityChangesReader.read(reader); - scheduledAuthorityChanges - .add(new Pair<>(blockNumber.add(authorityChanges.getDelay()), - authorityChanges.getAuthorities())); - return; - } - case FORCED_CHANGE -> { - ForcedChangeReader authorityForcedChangesReader = new ForcedChangeReader(); - authorityChanges = authorityForcedChangesReader.read(reader); - scheduledAuthorityChanges - .add(new Pair<>(blockNumber.add(authorityChanges.getDelay()), - authorityChanges.getAuthorities())); - return; - } - case ON_DISABLED -> { - log.log(Level.SEVERE, "'ON DISABLED' grandpa message not implemented"); - return; - } - case PAUSE -> { - log.log(Level.SEVERE, "'PAUSE' grandpa message not implemented"); - return; - } - case RESUME -> { - log.log(Level.SEVERE, "'RESUME' grandpa message not implemented"); - return; - } - } - } - } - } - } \ No newline at end of file diff --git a/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java b/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java index 364a6427..cd8bea64 100644 --- a/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java +++ b/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java @@ -1,8 +1,7 @@ package com.limechain.sync.warpsync.action; import com.limechain.exception.sync.JustificationVerificationException; -import com.limechain.grandpa.state.RoundState; -import com.limechain.network.protocol.grandpa.messages.consensus.GrandpaConsensusMessage; +import com.limechain.grandpa.state.GrandpaSetState; import com.limechain.network.protocol.warp.DigestHelper; import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.network.protocol.warp.dto.WarpSyncFragment; @@ -13,7 +12,6 @@ import com.limechain.sync.warpsync.WarpSyncState; import lombok.extern.java.Log; -import java.util.Optional; import java.util.logging.Level; // VerifyJustificationState is going to be instantiated a lot of times @@ -23,11 +21,11 @@ public class VerifyJustificationAction implements WarpSyncAction { private final WarpSyncState warpSyncState; private final SyncState syncState; - private final RoundState roundState; + private final GrandpaSetState grandpaSetState; private Exception error; public VerifyJustificationAction() { - this.roundState = AppBean.getBean(RoundState.class); + this.grandpaSetState = AppBean.getBean(GrandpaSetState.class); this.syncState = AppBean.getBean(SyncState.class); this.warpSyncState = AppBean.getBean(WarpSyncState.class); } @@ -78,7 +76,7 @@ private void handleAuthorityChanges(WarpSyncFragment fragment) { BlockHeader header = fragment.getHeader(); DigestHelper.getGrandpaConsensusMessage(header.getDigest()) - .ifPresent(cm -> roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber())); + .ifPresent(cm -> grandpaSetState.handleGrandpaConsensusMessage(cm, header.getBlockNumber())); log.log(Level.INFO, "Verified justification. Block hash is now at #" + syncState.getLastFinalizedBlockNumber() + ": "