Skip to content

Commit

Permalink
feat: Add queue feature for block handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zurcusa committed Jan 20, 2025
1 parent df4f627 commit dd85b16
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 58 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/limechain/babe/BabeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private void handleSlot(Slot slot, BabePreDigest preDigest) {
return;
}

blockHandler.handleProducedBlock(block);
blockHandler.handleProduced(block);
}

private Block produceBlock(BlockHeader parentHeader, Slot slot, BabePreDigest preDigest) {
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/limechain/client/LightClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.limechain.network.NetworkService;
import com.limechain.rpc.server.AppBean;
import com.limechain.storage.block.state.BlockState;
import com.limechain.sync.SyncService;
import com.limechain.sync.state.SyncState;

Expand All @@ -20,7 +19,6 @@ public LightClient() {
Objects.requireNonNull(AppBean.getBean(NetworkService.class)),
Objects.requireNonNull(AppBean.getBean(SyncService.class))),
List.of(
Objects.requireNonNull(AppBean.getBean(BlockState.class)),
Objects.requireNonNull(AppBean.getBean(SyncState.class))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void handleBlockAnnounce(byte[] msg, PeerId peerId) {

if (AppBean.getBean(BlockState.class).isInitialized()) {
//TODO Network improvements: Block requests should be sent to the peer that announced the block itself.
blockHandler.handleBlockHeader(Instant.now(), announce.getHeader(), peerId);
blockHandler.handleAnnounced(announce.getHeader(), Instant.now(), peerId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.limechain.network.protocol.grandpa.messages.vote.VoteMessageScaleReader;
import com.limechain.network.protocol.message.ProtocolMessageBuilder;
import com.limechain.rpc.server.AppBean;
import com.limechain.state.AbstractState;
import com.limechain.sync.SyncMode;
import com.limechain.sync.warpsync.WarpSyncState;
import io.emeraldpay.polkaj.scale.ScaleCodecReader;
import io.emeraldpay.polkaj.scale.ScaleCodecWriter;
Expand All @@ -38,13 +40,15 @@
public class GrandpaEngine {
private static final int HANDSHAKE_LENGTH = 1;

private final WarpSyncState warpSyncState;

protected ConnectionManager connectionManager;
protected WarpSyncState warpSyncState;
protected BlockAnnounceHandshakeBuilder handshakeBuilder;

public GrandpaEngine() {
connectionManager = ConnectionManager.getInstance();
warpSyncState = AppBean.getBean(WarpSyncState.class);

connectionManager = ConnectionManager.getInstance();
handshakeBuilder = new BlockAnnounceHandshakeBuilder();
}

Expand Down Expand Up @@ -101,6 +105,11 @@ private void handleResponderStreamMessage(byte[] message, GrandpaMessageType mes
return;
}

if (!AbstractState.getSyncMode().equals(SyncMode.HEAD)) {
log.fine("Skipping grandpa message before we reach head of chain.");
return;
}

switch (messageType) {
case HANDSHAKE -> handleHandshake(message, peerId, stream);
case VOTE -> handleVoteMessage(message, peerId);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/limechain/state/AbstractState.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public abstract class AbstractState implements ServiceState {
protected boolean initialized;

public static void setSyncMode(SyncMode mode) {
if (syncMode.ordinal() > mode.ordinal()) {
if (syncMode != null && syncMode.ordinal() > mode.ordinal()) {
throw new IllegalStateException(mode + " mode precedes " + syncMode);
}

Expand Down
182 changes: 137 additions & 45 deletions src/main/java/com/limechain/storage/block/BlockHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,132 +2,224 @@

import com.limechain.babe.BlockProductionVerifier;
import com.limechain.babe.state.EpochState;
import com.limechain.config.HostConfig;
import com.limechain.exception.storage.BlockStorageGenericException;
import com.limechain.grandpa.state.RoundState;
import com.limechain.network.PeerMessageCoordinator;
import com.limechain.network.PeerRequester;
import com.limechain.network.protocol.blockannounce.NodeRole;
import com.limechain.network.protocol.message.ProtocolMessageBuilder;
import com.limechain.network.protocol.sync.BlockRequestField;
import com.limechain.network.protocol.warp.DigestHelper;
import com.limechain.network.protocol.warp.dto.Block;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.runtime.Runtime;
import com.limechain.runtime.RuntimeBuilder;
import com.limechain.state.AbstractState;
import com.limechain.storage.block.state.BlockState;
import com.limechain.sync.SyncMode;
import com.limechain.sync.state.SyncState;
import com.limechain.transaction.TransactionProcessor;
import com.limechain.utils.async.AsyncExecutor;
import io.emeraldpay.polkaj.types.Hash256;
import io.libp2p.core.PeerId;
import lombok.extern.java.Log;
import org.javatuples.Pair;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Component
@Log
@Component
public class BlockHandler {

private final BlockState blockState;
private final EpochState epochState;
private final RoundState roundState;
private final SyncState syncState;

private final PeerRequester requester;
private final PeerMessageCoordinator messageCoordinator;

private final RuntimeBuilder builder;
private final AsyncExecutor asyncExecutor;
private final HostConfig hostConfig;
private final TransactionProcessor transactionProcessor;
private final BlockProductionVerifier verifier;

private final AsyncExecutor asyncExecutor;

private final HashMap<Hash256, BlockHeader> blockHeaders;
private final ArrayDeque<Pair<Instant, Block>> pendingBlocksQueue;

public BlockHandler(EpochState epochState,
BlockState blockState,
RoundState roundState,
SyncState syncState,
PeerRequester requester,
RuntimeBuilder builder,
HostConfig hostConfig,
TransactionProcessor transactionProcessor,
PeerMessageCoordinator messageCoordinator,
RoundState roundState) {
PeerMessageCoordinator messageCoordinator) {

this.epochState = epochState;
this.blockState = blockState;
this.roundState = roundState;
this.syncState = syncState;

this.requester = requester;
this.messageCoordinator = messageCoordinator;

this.builder = builder;
this.hostConfig = hostConfig;
this.transactionProcessor = transactionProcessor;
this.verifier = new BlockProductionVerifier();

asyncExecutor = AsyncExecutor.withPoolSize(10);
this.roundState = roundState;
blockHeaders = new HashMap<>();
pendingBlocksQueue = new ArrayDeque<>();
}

public synchronized void handleBlockHeader(Instant arrivalTime, BlockHeader header, PeerId excluding) {
public synchronized void handleAnnounced(BlockHeader header, Instant arrivalTime, PeerId peerId) {

if (blockHeaders.containsKey(header.getHash()) || blockState.hasHeader(header.getHash())) {
log.fine("Skipping announced block: " + header.getBlockNumber() + " " + header.getHash());
return;
}

if (!AbstractState.getSyncMode().equals(SyncMode.HEAD)) {
addBlockToQueue(header, arrivalTime);
return;
}

processPendingBlocksFromQueue();

Block block = requestBlock(header);
processBlock(block, arrivalTime);

messageCoordinator.sendBlockAnnounceMessageExcludingPeer(
ProtocolMessageBuilder.buildBlockAnnounceMessage(
header, header.getHash().equals(blockState.bestBlockHash())),
peerId);
}

public synchronized void handleProduced(Block block) {

addBlockToTree(block, Instant.now());
messageCoordinator.sendBlockAnnounceMessageExcludingPeer(
ProtocolMessageBuilder.buildBlockAnnounceMessage(block.getHeader(), true),
null);
}

private void processBlock(Block block, Instant arrivalTime) {
addBlockToTree(block, arrivalTime);

if (!hostConfig.getNodeRole().equals(NodeRole.LIGHT)) {
verifyAndExecuteBlock(block);
}
}

private void verifyAndExecuteBlock(Block block) {

try {
BlockHeader header = block.getHeader();

Runtime runtime = blockState.getRuntime(header.getParentHash());
Runtime newRuntime = builder.copyRuntime(runtime);

if (epochState.isInitialized() && !verifier.isAuthorshipValid(newRuntime,
if (!verifier.isAuthorshipValid(newRuntime,
header,
epochState.getCurrentEpochData(),
epochState.getCurrentEpochDescriptor(),
epochState.getCurrentEpochIndex())) {
return;
}

if (blockState.hasHeader(header.getHash())) {
log.fine("Skipping announced block: " + header.getBlockNumber() + " " + header.getHash());
return;
}

CompletableFuture<List<Block>> responseFuture = requester.requestBlocks(
BlockRequestField.ALL, header.getHash(), 1);

List<Block> blocks = responseFuture.join();
while (blocks.isEmpty()) {
blocks = requester.requestBlocks(
BlockRequestField.ALL, header.getHash(), 1).join();
}

Block block = blocks.getFirst();

newRuntime.executeBlock(block);
log.fine(String.format("Executed block No: %s with hash: %s.",
block.getHeader().getBlockNumber(), header.getHash()));
blockState.storeRuntime(header.getHash(), runtime);

handleBlock(block, arrivalTime);

messageCoordinator.sendBlockAnnounceMessageExcludingPeer(
ProtocolMessageBuilder.buildBlockAnnounceMessage(
block.getHeader(), block.getHeader().getHash().equals(blockState.bestBlockHash())),
excluding);
asyncExecutor.executeAndForget(() -> transactionProcessor.maintainTransactionPool(block));
} catch (Exception e) {
log.warning("Error while importing announced block: " + e);
}
}

public void handleProducedBlock(Block block) {
handleBlock(block, Instant.now());
messageCoordinator.sendBlockAnnounceMessageExcludingPeer(
ProtocolMessageBuilder.buildBlockAnnounceMessage(block.getHeader(), true),
null);
}
private void addBlockToTree(Block block, Instant arrivalTime) {

private void handleBlock(Block block, Instant arrivalTime) {
BlockHeader header = block.getHeader();

blockState.addBlockWithArrivalTime(block, arrivalTime);
log.fine(String.format("Added block No: %s with hash: %s to block tree.",
block.getHeader().getBlockNumber(), header.getHash()));

DigestHelper.getBabeConsensusMessage(header.getDigest())
.ifPresent(cm -> {
epochState.updateNextEpochConfig(cm);
log.fine(String.format("Updated epoch block config: %s", cm.getFormat().toString()));
});
if (epochState.isInitialized()) {
asyncExecutor.executeAndForget(() -> DigestHelper.getBabeConsensusMessage(header.getDigest())
.ifPresent(cm -> {
epochState.updateNextEpochConfig(cm);
log.fine(String.format("Updated epoch block config: %s", cm.getFormat().toString()));
}));
}

if (roundState.isInitialized()) {
asyncExecutor.executeAndForget(() -> DigestHelper.getGrandpaConsensusMessage(header.getDigest())
.ifPresent(cm ->
roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber())
));

roundState.handleAuthoritySetChange(header.getBlockNumber());
}
}

private void addBlockToQueue(BlockHeader blockHeader, Instant arrivalTime) {

DigestHelper.getGrandpaConsensusMessage(header.getDigest())
.ifPresent(cm -> roundState.handleGrandpaConsensusMessage(cm, header.getBlockNumber()));
blockHeaders.put(blockHeader.getHash(), blockHeader);

roundState.handleAuthoritySetChange(header.getBlockNumber());
asyncExecutor.executeAndForget(() -> {
Block block = requestBlock(blockHeader);
pendingBlocksQueue.add(Pair.with(arrivalTime, block));
log.fine("Added block to queue " + block.getHeader().getBlockNumber() + " " + block.getHeader().getHash());
});
}

private void processPendingBlocksFromQueue() {

while (!pendingBlocksQueue.isEmpty()) {
var currentPair = pendingBlocksQueue.poll();
var block = currentPair.getValue1();
var arrivalTime = currentPair.getValue0();

blockHeaders.remove(block.getHeader().getHash());

if (block.getHeader().getBlockNumber().compareTo(syncState.getLastFinalizedBlockNumber()) <= 0) {
continue;
}

try {
processBlock(block, arrivalTime);
} catch (BlockStorageGenericException ex) {
log.fine(String.format("[%s] %s", block.getHeader().getHash().toString(), ex.getMessage()));
}
}
}

private Block requestBlock(BlockHeader header) {

List<Block> blocks = new ArrayList<>();

while (blocks.isEmpty()) {
CompletableFuture<List<Block>> responseFuture = requester.requestBlocks(
BlockRequestField.ALL, header.getHash(), 1);

blocks = responseFuture.join();
}

asyncExecutor.executeAndForget(() -> transactionProcessor.maintainTransactionPool(block));
log.fine("Request successful " + blocks.getFirst().getHeader().getHash());
return blocks.getFirst();
}
}
12 changes: 9 additions & 3 deletions src/main/java/com/limechain/sync/SyncService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@ public class SyncService implements NodeService {

@Override
public void start() {
SyncMode initSyncMode = arguments.syncMode();
AbstractState.setSyncMode(initSyncMode);

switch (NodeRole.fromString(arguments.nodeRole())) {
case LIGHT -> {
warpSyncMachine.onFinish(() -> AbstractState.setSyncMode(SyncMode.HEAD),
messageCoordinator::handshakeBootNodes);
warpSyncMachine.onFinish(() -> {
AbstractState.setSyncMode(SyncMode.HEAD);
messageCoordinator.handshakeBootNodes();
messageCoordinator.handshakePeers();
});
warpSyncMachine.start();
}
case FULL, AUTHORING -> {
switch (arguments.syncMode()) {
switch (initSyncMode) {
case FULL -> fullSyncMachine.start();
case WARP -> {
warpSyncMachine.onFinish(() -> AbstractState.setSyncMode(SyncMode.FULL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public void start() {
.add(BigInteger.ONE)
.intValueExact();

messageCoordinator.handshakeBootNodes();
messageCoordinator.handshakePeers();

int blocksToFetch = 100;
List<Block> receivedBlocks = requester.requestBlocks(BlockRequestField.ALL, startNumber, blocksToFetch).join();

Expand All @@ -133,8 +136,6 @@ private void finishFullSync() {
initializeStates();

AbstractState.setSyncMode(SyncMode.HEAD);
messageCoordinator.handshakeBootNodes();
messageCoordinator.handshakePeers();
}

private void initializeStates() {
Expand Down
Loading

0 comments on commit dd85b16

Please sign in to comment.