Skip to content

Commit

Permalink
fixed conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleksandr committed Jan 17, 2025
1 parent 0c0c7d9 commit 23d716d
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 102 deletions.
4 changes: 3 additions & 1 deletion src/main/java/com/limechain/grandpa/GrandpaService.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public Vote getGrandpaGhost(GrandpaRound grandpaRound) {
* @return the best pre-voted block
*/
public Vote getBestPreVoteCandidate(GrandpaRound grandpaRound) {
Vote previousBestFinalCandidate = grandpaRound.getPrevious().getBestFinalCandidate();
Vote previousBestFinalCandidate = grandpaRound.getPrevious() != null
? grandpaRound.getPrevious().getBestFinalCandidate()
: new Vote(null, BigInteger.ZERO);
Vote currentVote = getGrandpaGhost(grandpaRound);

SignedVote primaryVote = grandpaRound.getPrimaryVote();
Expand Down
73 changes: 69 additions & 4 deletions src/main/java/com/limechain/grandpa/state/GrandpaSetState.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,38 @@
import com.limechain.exception.grandpa.GrandpaGenericException;
import com.limechain.network.protocol.grandpa.messages.catchup.res.SignedVote;
import com.limechain.network.protocol.grandpa.messages.commit.Vote;
import com.limechain.network.protocol.grandpa.messages.consensus.GrandpaConsensusMessage;
import com.limechain.network.protocol.grandpa.messages.vote.SignedMessage;
import com.limechain.network.protocol.grandpa.messages.vote.Subround;
import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage;
import com.limechain.rpc.server.AppBean;
import com.limechain.storage.DBConstants;
import com.limechain.storage.KVRepository;
import com.limechain.storage.StateUtil;
import com.limechain.sync.warpsync.dto.AuthoritySetChange;
import com.limechain.sync.warpsync.dto.ForcedAuthoritySetChange;
import com.limechain.sync.warpsync.dto.ScheduledAuthoritySetChange;
import io.emeraldpay.polkaj.types.Hash256;
import io.libp2p.core.crypto.PubKey;
import jakarta.annotation.PostConstruct;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.java.Log;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.logging.Level;

/**
* Represents the state information for the current round and authorities that are needed
* for block finalization with GRANDPA.
* Note: Intended for use only when the host is configured as an Authoring Node.
*/
@Log
@Getter
@Setter //TODO: remove it when initialize() method is implemented
@RequiredArgsConstructor
Expand All @@ -42,8 +49,8 @@ public class GrandpaSetState {
private BigInteger setId;
private RoundCache roundCache;

private final PriorityQueue<AuthoritySetChange> authoritySetChanges = new PriorityQueue<>(AuthoritySetChange.getComparator());

@PostConstruct
public void initialize() {
loadPersistedState();
roundCache = AppBean.getBean(RoundCache.class);
Expand Down Expand Up @@ -131,16 +138,74 @@ public void persistState() {
savePrevotes(roundCache.getLatestRoundNumber(setId));
}

public BigInteger incrementSetId() {
public void startNewSet(List<Authority> authorities) {
this.setId = setId.add(BigInteger.ONE);
return setId;
GrandpaRound grandpaRound = new GrandpaRound();
grandpaRound.setRoundNumber(BigInteger.ZERO);
roundCache.addRound(setId, grandpaRound);
this.authorities = authorities;

log.log(Level.INFO, "Successfully transitioned to authority set id: " + setId);
}

public void setLightSyncState(LightSyncState initState) {
this.setId = initState.getGrandpaAuthoritySet().getSetId();
this.authorities = Arrays.asList(initState.getGrandpaAuthoritySet().getCurrentAuthorities());
}

/**
* Apply scheduled or forced authority set changes from the queue if present
*
* @param blockNumber required to determine if it's time to apply the change
*/
public boolean handleAuthoritySetChange(BigInteger blockNumber) {
AuthoritySetChange changeSetData = authoritySetChanges.peek();

boolean updated = false;
while (changeSetData != null) {

if (changeSetData.getApplicationBlockNumber().compareTo(blockNumber) > 0) {
break;
}

startNewSet(changeSetData.getAuthorities());
authoritySetChanges.poll();
updated = true;

changeSetData = authoritySetChanges.peek();
}

return updated;
}

public void handleGrandpaConsensusMessage(GrandpaConsensusMessage consensusMessage, BigInteger currentBlockNumber) {
switch (consensusMessage.getFormat()) {
case GRANDPA_SCHEDULED_CHANGE -> authoritySetChanges.add(new ScheduledAuthoritySetChange(
consensusMessage.getAuthorities(),
consensusMessage.getDelay(),
currentBlockNumber
));
case GRANDPA_FORCED_CHANGE -> authoritySetChanges.add(new ForcedAuthoritySetChange(
consensusMessage.getAuthorities(),
consensusMessage.getDelay(),
consensusMessage.getAdditionalOffset(),
currentBlockNumber
));
//TODO: Implement later
case GRANDPA_ON_DISABLED -> {
log.log(Level.SEVERE, "'ON DISABLED' grandpa message not implemented");
}
case GRANDPA_PAUSE -> {
log.log(Level.SEVERE, "'PAUSE' grandpa message not implemented");
}
case GRANDPA_RESUME -> {
log.log(Level.SEVERE, "'RESUME' grandpa message not implemented");
}
}

log.fine(String.format("Updated grandpa set config: %s", consensusMessage.getFormat().toString()));
}

public void handleVoteMessage(VoteMessage voteMessage) {
BigInteger voteMessageSetId = voteMessage.getSetId();
BigInteger voteMessageRoundNumber = voteMessage.getRound();
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/com/limechain/storage/block/BlockHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.limechain.babe.BlockProductionVerifier;
import com.limechain.babe.state.EpochState;
import com.limechain.grandpa.state.RoundState;
import com.limechain.grandpa.state.GrandpaSetState;
import com.limechain.network.PeerMessageCoordinator;
import com.limechain.network.PeerRequester;
import com.limechain.network.protocol.message.ProtocolMessageBuilder;
Expand All @@ -28,7 +28,7 @@ public class BlockHandler {

private final BlockState blockState;
private final EpochState epochState;
private final RoundState roundState;
private final GrandpaSetState grandpaSetState;

private final PeerRequester requester;
private final PeerMessageCoordinator messageCoordinator;
Expand All @@ -43,7 +43,7 @@ public BlockHandler(EpochState epochState,
RuntimeBuilder builder,
TransactionProcessor transactionProcessor,
PeerMessageCoordinator messageCoordinator,
RoundState roundState) {
GrandpaSetState grandpaSetState) {

this.epochState = epochState;
this.requester = requester;
Expand All @@ -53,7 +53,7 @@ public BlockHandler(EpochState epochState,
this.verifier = new BlockProductionVerifier();
blockState = BlockState.getInstance();
asyncExecutor = AsyncExecutor.withPoolSize(10);
this.roundState = roundState;
this.grandpaSetState = grandpaSetState;
}

public synchronized void handleBlockHeader(Instant arrivalTime, BlockHeader header, PeerId excluding) {
Expand Down Expand Up @@ -122,9 +122,9 @@ private void handleBlock(Block block, Instant arrivalTime) {
});

DigestHelper.getGrandpaConsensusMessage(header.getDigest())
.ifPresent(cm -> roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber()));
.ifPresent(cm -> grandpaSetState.handleGrandpaConsensusMessage(cm, header.getBlockNumber()));

roundState.handleAuthoritySetChange(header.getBlockNumber());
grandpaSetState.handleAuthoritySetChange(header.getBlockNumber());

asyncExecutor.executeAndForget(() -> transactionProcessor.maintainTransactionPool(block));
}
Expand Down
95 changes: 10 additions & 85 deletions src/main/java/com/limechain/sync/warpsync/WarpSyncState.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.limechain.sync.warpsync;

import com.limechain.chain.lightsyncstate.Authority;
import com.limechain.exception.global.RuntimeCodeException;
import com.limechain.exception.trie.TrieDecoderException;
import com.limechain.grandpa.state.GrandpaSetState;
Expand All @@ -12,10 +11,9 @@
import com.limechain.network.protocol.lightclient.pb.LightClientMessage;
import com.limechain.network.protocol.sync.BlockRequestField;
import com.limechain.network.protocol.sync.pb.SyncMessage.BlockData;
import com.limechain.network.protocol.warp.DigestHelper;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.network.protocol.warp.dto.ConsensusEngine;
import com.limechain.network.protocol.warp.dto.DigestType;
import com.limechain.network.protocol.warp.dto.HeaderDigest;
import com.limechain.network.protocol.warp.dto.Justification;
import com.limechain.network.protocol.warp.scale.reader.BlockHeaderReader;
import com.limechain.network.protocol.warp.scale.reader.JustificationReader;
Expand All @@ -25,10 +23,6 @@
import com.limechain.storage.KVRepository;
import com.limechain.storage.block.SyncState;
import com.limechain.sync.JustificationVerifier;
import com.limechain.sync.warpsync.dto.AuthoritySetChange;
import com.limechain.sync.warpsync.dto.GrandpaDigestMessageType;
import com.limechain.sync.warpsync.scale.ForcedChangeReader;
import com.limechain.sync.warpsync.scale.ScheduledChangeReader;
import com.limechain.trie.decoded.Trie;
import com.limechain.trie.decoded.TrieVerifier;
import com.limechain.utils.LittleEndianUtils;
Expand All @@ -39,14 +33,11 @@
import lombok.Getter;
import lombok.Setter;
import lombok.extern.java.Log;
import org.javatuples.Pair;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.logging.Level;

Expand Down Expand Up @@ -78,36 +69,34 @@ public class WarpSyncState {
protected final RuntimeBuilder runtimeBuilder;
//TODO Yordan: maybe we won't need this anymore.
private final Set<BigInteger> scheduledRuntimeUpdateBlocks;
private final PriorityQueue<Pair<BigInteger, Authority[]>> scheduledAuthorityChanges;


public WarpSyncState(SyncState syncState,
KVRepository<String, Object> db,
RuntimeBuilder runtimeBuilder,
PeerRequester requester,
PeerMessageCoordinator messageCoordinator,
RoundState roundState) {
GrandpaSetState grandpaSetState) {
this(syncState,
grandpaSetState,
db,
runtimeBuilder,
new HashSet<>(),
requester,
messageCoordinator);
messageCoordinator
);
}

public WarpSyncState(SyncState syncState, GrandpaSetState grandpaSetState,
KVRepository<String, Object> db,
RuntimeBuilder runtimeBuilder, Set<BigInteger> scheduledRuntimeUpdateBlocks,
PriorityQueue<Pair<BigInteger, Authority[]>> scheduledAuthorityChanges,
PeerRequester requester,
PeerMessageCoordinator messageCoordinator) {

this.syncState = syncState;
this.grandpaSetState = grandpaSetState;
this.db = db;
this.runtimeBuilder = runtimeBuilder;
this.scheduledRuntimeUpdateBlocks = scheduledRuntimeUpdateBlocks;
this.scheduledAuthorityChanges = scheduledAuthorityChanges;
this.requester = requester;
this.messageCoordinator = messageCoordinator;
}
Expand Down Expand Up @@ -280,6 +269,7 @@ public void syncNeighbourMessage(NeighbourMessage neighbourMessage, PeerId peerI
}

private void updateSetData(BigInteger setChangeBlock) {

List<BlockData> response = requester.requestBlockData(
BlockRequestField.ALL,
setChangeBlock.intValueExact(),
Expand All @@ -295,6 +285,7 @@ private void updateSetData(BigInteger setChangeBlock) {

Justification justification = new JustificationReader().read(
new ScaleCodecReader(block.getJustification().toByteArray()));

boolean verified = justification != null
&& JustificationVerifier.verify(justification.getPrecommits(), justification.getRound());

Expand All @@ -304,7 +295,7 @@ private void updateSetData(BigInteger setChangeBlock) {
syncState.finalizeHeader(header);

DigestHelper.getGrandpaConsensusMessage(header.getDigest())
.ifPresent(cm -> roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber()));
.ifPresent(cm -> grandpaSetState.handleGrandpaConsensusMessage(cm, header.getBlockNumber()));

handleScheduledEvents();
}
Expand All @@ -314,76 +305,10 @@ private void updateSetData(BigInteger setChangeBlock) {
* Executes scheduled or forced authority changes for the last finalized block.
*/
public void handleScheduledEvents() {
Pair<BigInteger, Authority[]> data = scheduledAuthorityChanges.peek();
BigInteger authoritiesSetId = roundState.getSetId();
boolean updated = false;
while (data != null) {
if (data.getValue0().compareTo(syncState.getLastFinalizedBlockNumber()) < 1) {
authoritiesSetId = roundState.incrementSetId();
roundState.resetRound();
roundState.setAuthorities(Arrays.asList(data.getValue1()));
scheduledAuthorityChanges.poll();
updated = true;
} else break;
data = scheduledAuthorityChanges.peek();
}
boolean updated = grandpaSetState.handleAuthoritySetChange(syncState.getLastFinalizedBlockNumber());

if (warpSyncFinished && updated) {
log.log(Level.INFO, "Successfully transitioned to authority set id: " + authoritiesSetId);
new Thread(messageCoordinator::sendMessagesToPeers).start();
}
}

/**
* Handles authority changes coming from a block header digest and schedules them.
*
* @param headerDigests digest of the block header
* @param blockNumber block that contains the digest
*/
public void handleAuthorityChanges(HeaderDigest[] headerDigests, BigInteger blockNumber) {
// Update authority set and set id
AuthoritySetChange authorityChanges;
for (HeaderDigest digest : headerDigests) {
if (digest.getId() == ConsensusEngine.GRANDPA) {
ScaleCodecReader reader = new ScaleCodecReader(digest.getMessage());
GrandpaDigestMessageType type = GrandpaDigestMessageType.fromId(reader.readByte());

if (type == null) {
log.log(Level.SEVERE, "Could not get grandpa message type");
throw new IllegalStateException("Unknown grandpa message type");
}

switch (type) {
case SCHEDULED_CHANGE -> {
ScheduledChangeReader authorityChangesReader = new ScheduledChangeReader();
authorityChanges = authorityChangesReader.read(reader);
scheduledAuthorityChanges
.add(new Pair<>(blockNumber.add(authorityChanges.getDelay()),
authorityChanges.getAuthorities()));
return;
}
case FORCED_CHANGE -> {
ForcedChangeReader authorityForcedChangesReader = new ForcedChangeReader();
authorityChanges = authorityForcedChangesReader.read(reader);
scheduledAuthorityChanges
.add(new Pair<>(blockNumber.add(authorityChanges.getDelay()),
authorityChanges.getAuthorities()));
return;
}
case ON_DISABLED -> {
log.log(Level.SEVERE, "'ON DISABLED' grandpa message not implemented");
return;
}
case PAUSE -> {
log.log(Level.SEVERE, "'PAUSE' grandpa message not implemented");
return;
}
case RESUME -> {
log.log(Level.SEVERE, "'RESUME' grandpa message not implemented");
return;
}
}
}
}
}

}
Loading

0 comments on commit 23d716d

Please sign in to comment.