From 5089dd7bd52a6c4c782b484c2672c41ff0397827 Mon Sep 17 00:00:00 2001 From: Hristiyan Mitov <67628947+hMitov@users.noreply.github.com> Date: Thu, 23 Jan 2025 08:30:28 +0200 Subject: [PATCH] feat: Implement broadcasting commit messages and added unit tests. (#707) # Description Implement broadcasting commit messages. The logic is being used in Play Grandpa Round when we are the primary validator in the current round. A commit message is constructed, based on the best final candidate determined from the previous round. Commit message is broadcasted to network peers. Fixes #693 > --------- Co-authored-by: Hristiyan Mitov --- .../com/limechain/grandpa/GrandpaService.java | 41 ++++++++++++++- .../grandpa/state/GrandpaSetState.java | 3 +- .../network/PeerMessageCoordinator.java | 11 ++++ .../protocol/grandpa/GrandpaController.java | 7 +++ .../protocol/grandpa/GrandpaEngine.java | 12 +++++ .../protocol/grandpa/GrandpaService.java | 17 +++++++ .../limechain/grandpa/GrandpaServiceTest.java | 51 ++++++++++++++++++- .../grandpa/GrandpaControllerTest.java | 10 +++- .../protocol/grandpa/GrandpaEngineTest.java | 17 ++++++- .../protocol/grandpa/GrandpaServiceTest.java | 43 +++++++++++++--- 10 files changed, 200 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/limechain/grandpa/GrandpaService.java b/src/main/java/com/limechain/grandpa/GrandpaService.java index 8f09fb626..57593adf1 100644 --- a/src/main/java/com/limechain/grandpa/GrandpaService.java +++ b/src/main/java/com/limechain/grandpa/GrandpaService.java @@ -4,10 +4,13 @@ import com.limechain.exception.storage.BlockStorageGenericException; import com.limechain.grandpa.state.GrandpaRound; import com.limechain.grandpa.state.GrandpaSetState; +import com.limechain.network.PeerMessageCoordinator; import com.limechain.network.protocol.grandpa.messages.catchup.res.SignedVote; +import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; import com.limechain.network.protocol.grandpa.messages.commit.Vote; import com.limechain.network.protocol.grandpa.messages.vote.Subround; import com.limechain.network.protocol.warp.dto.BlockHeader; +import com.limechain.network.protocol.warp.dto.PreCommit; import com.limechain.storage.block.BlockState; import io.emeraldpay.polkaj.types.Hash256; import lombok.extern.java.Log; @@ -25,10 +28,14 @@ public class GrandpaService { private final GrandpaSetState grandpaSetState; private final BlockState blockState; + private final PeerMessageCoordinator peerMessageCoordinator; - public GrandpaService(GrandpaSetState grandpaSetState) { + + public GrandpaService(GrandpaSetState grandpaSetState, + PeerMessageCoordinator peerMessageCoordinator) { this.grandpaSetState = grandpaSetState; this.blockState = BlockState.getInstance(); + this.peerMessageCoordinator = peerMessageCoordinator; } /** @@ -422,4 +429,36 @@ private Vote getLastFinalizedBlockAsVote() { lastFinalizedBlockHeader.getBlockNumber() ); } + + /** + * Broadcasts a commit message to network peers for the given round as part of the GRANDPA consensus process. + *

+ * This method is used in two scenarios: + * 1. As the primary validator, broadcasting a commit message for the best candidate block of the previous round. + * 2. During attempt-to-finalize, broadcasting a commit message for the best candidate block of the current round. + */ + public void broadcastCommitMessage(GrandpaRound grandpaRound) { + Vote bestCandidate = getBestFinalCandidate(grandpaRound); + PreCommit[] precommits = transformToCompactJustificationFormat(grandpaRound.getPreCommits()); + + CommitMessage commitMessage = new CommitMessage(); + commitMessage.setSetId(grandpaSetState.getSetId()); + commitMessage.setRoundNumber(grandpaRound.getRoundNumber()); + commitMessage.setVote(bestCandidate); + commitMessage.setPreCommits(precommits); + + peerMessageCoordinator.sendCommitMessageToPeers(commitMessage); + } + + private PreCommit[] transformToCompactJustificationFormat(Map signedVotes) { + return signedVotes.values().stream() + .map(signedVote -> { + PreCommit precommit = new PreCommit(); + precommit.setTargetHash(signedVote.getVote().getBlockHash()); + precommit.setTargetNumber(signedVote.getVote().getBlockNumber()); + precommit.setSignature(signedVote.getSignature()); + precommit.setAuthorityPublicKey(signedVote.getAuthorityPublicKey()); + return precommit; + }).toArray(PreCommit[]::new); + } } diff --git a/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java b/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java index 0dd9f51a1..7515d3bb2 100644 --- a/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java +++ b/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java @@ -227,5 +227,4 @@ public void handleVoteMessage(VoteMessage voteMessage) { default -> throw new GrandpaGenericException("Unknown subround: " + subround); } } - -} \ No newline at end of file +} diff --git a/src/main/java/com/limechain/network/PeerMessageCoordinator.java b/src/main/java/com/limechain/network/PeerMessageCoordinator.java index 4a6797364..fadb95c2b 100644 --- a/src/main/java/com/limechain/network/PeerMessageCoordinator.java +++ b/src/main/java/com/limechain/network/PeerMessageCoordinator.java @@ -4,6 +4,8 @@ import com.limechain.network.protocol.blockannounce.NodeRole; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleWriter; +import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; +import com.limechain.network.protocol.grandpa.messages.commit.CommitMessageScaleWriter; import com.limechain.network.protocol.transaction.scale.TransactionWriter; import com.limechain.transaction.dto.Extrinsic; import com.limechain.transaction.dto.ExtrinsicArray; @@ -99,4 +101,13 @@ public void handshakeBootNodes() { public void sendNeighbourMessageToPeer(PeerId peerId) { network.getGrandpaService().sendNeighbourMessage(network.getHost(), peerId); } + + public void sendCommitMessageToPeers(CommitMessage commitMessage) { + byte[] scaleMessage = ScaleUtils.Encode.encode(CommitMessageScaleWriter.getInstance(), commitMessage); + sendMessageToActivePeers(peerId -> { + asyncExecutor.executeAndForget(() -> network.getGrandpaService().sendCommitMessage( + network.getHost(), peerId, scaleMessage + )); + }); + } } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java index d68e0c3c8..381fe1566 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java @@ -26,4 +26,11 @@ public void sendHandshake() { public void sendNeighbourMessage() { engine.writeNeighbourMessage(stream, stream.remotePeerId()); } + + /** + * Sends a commit message over the controller stream. + */ + public void sendCommitMessage(byte[] encodedCommitMessage) { + engine.writeCommitMessage(stream, encodedCommitMessage); + } } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java index 9c585c43e..9f01b1a69 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java @@ -1,6 +1,7 @@ package com.limechain.network.protocol.grandpa; import com.limechain.exception.scale.ScaleEncodingException; +import com.limechain.grandpa.GrandpaService; import com.limechain.grandpa.state.GrandpaSetState; import com.limechain.network.ConnectionManager; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; @@ -201,4 +202,15 @@ public void writeNeighbourMessage(Stream stream, PeerId peerId) { log.log(Level.FINE, "Sending neighbour message to peer " + peerId); stream.writeAndFlush(buf.toByteArray()); } + + /** + * Send our GRANDPA commit message from {@link GrandpaService} on a given responder stream. + * + * @param stream responder stream to write the message to + * @param encodedCommitMessage scale encoded CommitMessage object + */ + public void writeCommitMessage(Stream stream, byte[] encodedCommitMessage) { + log.log(Level.FINE, "Sending commit message to peer " + stream.remotePeerId()); + stream.writeAndFlush(encodedCommitMessage); + } } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java index 38bc941c0..84b24c345 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java @@ -37,6 +37,23 @@ public void sendNeighbourMessage(Host us, PeerId peerId) { ); } + /** + * Sends a commit message to a peer. If there is no initiator stream opened with the peer, + * sends a handshake instead. + * + * @param us our host object + * @param peerId message receiver + * @param encodedCommitMessage scale encoded representation of the CommitMessage object + */ + public void sendCommitMessage(Host us, PeerId peerId, byte[] encodedCommitMessage) { + Optional.ofNullable(connectionManager.getPeerInfo(peerId)) + .map(p -> p.getGrandpaStreams().getInitiator()) + .ifPresentOrElse( + stream -> new GrandpaController(stream).sendCommitMessage(encodedCommitMessage), + () -> sendHandshake(us, peerId) + ); + } + private void sendNeighbourMessage(Stream stream) { GrandpaController controller = new GrandpaController(stream); controller.sendNeighbourMessage(); diff --git a/src/test/java/com/limechain/grandpa/GrandpaServiceTest.java b/src/test/java/com/limechain/grandpa/GrandpaServiceTest.java index a9d1753c4..000a87160 100644 --- a/src/test/java/com/limechain/grandpa/GrandpaServiceTest.java +++ b/src/test/java/com/limechain/grandpa/GrandpaServiceTest.java @@ -3,7 +3,9 @@ import com.limechain.exception.grandpa.GhostExecutionException; import com.limechain.grandpa.state.GrandpaRound; import com.limechain.grandpa.state.GrandpaSetState; +import com.limechain.network.PeerMessageCoordinator; import com.limechain.network.protocol.grandpa.messages.catchup.res.SignedVote; +import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; import com.limechain.network.protocol.grandpa.messages.commit.Vote; import com.limechain.network.protocol.grandpa.messages.vote.SignedMessage; import com.limechain.network.protocol.grandpa.messages.vote.Subround; @@ -12,12 +14,14 @@ import com.limechain.network.protocol.warp.dto.ConsensusEngine; import com.limechain.network.protocol.warp.dto.DigestType; import com.limechain.network.protocol.warp.dto.HeaderDigest; +import com.limechain.network.protocol.warp.dto.PreCommit; import com.limechain.storage.block.BlockState; import io.emeraldpay.polkaj.types.Hash256; import io.emeraldpay.polkaj.types.Hash512; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.MockedStatic; import java.lang.reflect.Method; @@ -34,6 +38,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; class GrandpaServiceTest { @@ -51,6 +57,7 @@ class GrandpaServiceTest { private MockedStatic mockedBlockState; private GrandpaService grandpaService; private GrandpaRound grandpaRound; + private PeerMessageCoordinator peerMessageCoordinator; @BeforeEach void setUp() { @@ -58,7 +65,8 @@ void setUp() { blockState = mock(BlockState.class); mockedBlockState = mockStatic(BlockState.class); mockedBlockState.when(BlockState::getInstance).thenReturn(blockState); - grandpaService = new GrandpaService(grandpaSetState); + peerMessageCoordinator = mock(PeerMessageCoordinator.class); + grandpaService = new GrandpaService(grandpaSetState, peerMessageCoordinator); grandpaRound = mock(GrandpaRound.class); when(grandpaRound.getPrevious()).thenReturn(new GrandpaRound()); } @@ -723,6 +731,47 @@ void testSelectBlockWithMostVotesWhereLastFinalizedBlockIsWithGreaterBlockNumber assertEquals(blockHeader.getBlockNumber(), result.getBlockNumber()); } + @Test + void testBroadcastCommitMessageWhenPrimaryValidator() { + Hash256 authorityPublicKey = new Hash256(THREES_ARRAY); + Map signedVotes = new HashMap<>(); + Vote vote = new Vote(new Hash256(ONES_ARRAY), BigInteger.valueOf(123L)); + SignedVote signedVote = new SignedVote(vote, Hash512.empty(), authorityPublicKey); + signedVotes.put(authorityPublicKey, signedVote); + + GrandpaRound previousRound = new GrandpaRound(); + previousRound.setRoundNumber(BigInteger.ZERO); + previousRound.setPreCommits(signedVotes); + BlockHeader blockHeader = createBlockHeader(); + + when(grandpaSetState.getThreshold()).thenReturn(BigInteger.ONE); + when(blockState.getHighestFinalizedHeader()).thenReturn(blockHeader); + when(grandpaSetState.getSetId()).thenReturn(BigInteger.valueOf(42L)); + + grandpaService.broadcastCommitMessage(previousRound); + + ArgumentCaptor commitMessageCaptor = ArgumentCaptor.forClass(CommitMessage.class); + verify(peerMessageCoordinator).sendCommitMessageToPeers(commitMessageCaptor.capture()); + CommitMessage commitMessage = commitMessageCaptor.getValue(); + + assertEquals(BigInteger.valueOf(42L), commitMessage.getSetId()); + assertEquals(BigInteger.valueOf(0L), commitMessage.getRoundNumber()); + assertEquals(blockHeader.getBlockNumber(), commitMessage.getVote().getBlockNumber()); + assertEquals(blockHeader.getHash(), commitMessage.getVote().getBlockHash()); + assertEquals(1, commitMessage.getPreCommits().length); + + PreCommit precommit = commitMessage.getPreCommits()[0]; + assertEquals(vote.getBlockHash(), precommit.getTargetHash()); + assertEquals(BigInteger.valueOf(123L), precommit.getTargetNumber()); + assertEquals(Hash512.empty(), precommit.getSignature()); + assertEquals(signedVotes.get(authorityPublicKey).getAuthorityPublicKey(), + precommit.getAuthorityPublicKey() + ); + + verify(peerMessageCoordinator, times(1)) + .sendCommitMessageToPeers(any(CommitMessage.class)); + } + private BlockHeader createBlockHeader() { HeaderDigest headerDigest = new HeaderDigest(); headerDigest.setType(DigestType.CONSENSUS_MESSAGE); diff --git a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaControllerTest.java b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaControllerTest.java index ec35e4fda..a0fabd434 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaControllerTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaControllerTest.java @@ -25,19 +25,27 @@ class GrandpaControllerTest { @BeforeEach void setup() { - when(stream.remotePeerId()).thenReturn(peerId); grandpaController.engine = engine; } @Test void sendHandshake() { + when(stream.remotePeerId()).thenReturn(peerId); grandpaController.sendHandshake(); verify(engine).writeHandshakeToStream(stream, peerId); } @Test void sendNeighbourMessage() { + when(stream.remotePeerId()).thenReturn(peerId); grandpaController.sendNeighbourMessage(); verify(engine).writeNeighbourMessage(stream, peerId); } + + @Test + void sendCommitMessage() { + byte[] encodedCommitMessage = {1, 0, 0, 0, 2, 0, 1, 1, 1, 1, 0, 0, 0, 1, 2, 0}; + grandpaController.sendCommitMessage(encodedCommitMessage); + verify(engine).writeCommitMessage(stream, encodedCommitMessage); + } } \ No newline at end of file diff --git a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java index 79fedd303..434b1579c 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java @@ -33,7 +33,13 @@ import java.math.BigInteger; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; @SuppressWarnings("unused") @ExtendWith(MockitoExtension.class) @@ -58,6 +64,9 @@ class GrandpaEngineTest { private final byte[] encodedNeighbourMessage = new byte[]{2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0}; + private final byte[] encodedCommitMessage + = new byte[]{2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0}; + @Test void receiveRequestWithUnknownGrandpaTypeShouldLogAndIgnore() { byte[] unknownTypeMessage = new byte[]{7, 1, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0}; @@ -286,4 +295,10 @@ void writeNeighbourMessage() { verify(stream).writeAndFlush(encodedNeighbourMessage); } } + + @Test + void writeCommitMessage() { + grandpaEngine.writeCommitMessage(stream, encodedCommitMessage); + verify(stream).writeAndFlush(encodedCommitMessage); + } } \ No newline at end of file diff --git a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaServiceTest.java b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaServiceTest.java index be482b4c3..b727a4603 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaServiceTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaServiceTest.java @@ -18,7 +18,6 @@ import java.lang.reflect.Field; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockConstruction; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,6 +31,8 @@ class GrandpaServiceTest { @Mock private ProtocolStreams protocolStreams; @Mock + private Stream stream; + @Mock private Host host; @Mock private PeerId peerId; @@ -39,6 +40,13 @@ class GrandpaServiceTest { private ConnectionManager connectionManager; @Mock private Grandpa protocol; + @Mock + private AddressBook addressBook; + @Mock + private GrandpaController grandpaController; + + private final byte[] encodedCommitMessage + = new byte[]{2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0}; //setup method @BeforeEach @@ -48,8 +56,6 @@ public void setupEach() throws NoSuchFieldException, IllegalAccessException { @Test void sendNeighbourMessageWhenNotConnectionShouldSendHandshake() { - AddressBook addressBook = mock(AddressBook.class); - GrandpaController grandpaController = mock(GrandpaController.class); when(connectionManager.getPeerInfo(peerId)).thenReturn(peerInfo); when(peerInfo.getGrandpaStreams()).thenReturn(protocolStreams); when(protocolStreams.getInitiator()).thenReturn(null); @@ -63,9 +69,6 @@ void sendNeighbourMessageWhenNotConnectionShouldSendHandshake() { @Test void sendNeighbourMessageWhenExistingConnection() { - PeerInfo peerInfo = mock(PeerInfo.class); - ProtocolStreams protocolStreams = mock(ProtocolStreams.class); - Stream stream = mock(Stream.class); when(connectionManager.getPeerInfo(peerId)).thenReturn(peerInfo); when(peerInfo.getGrandpaStreams()).thenReturn(protocolStreams); when(protocolStreams.getInitiator()).thenReturn(stream); @@ -79,6 +82,34 @@ void sendNeighbourMessageWhenExistingConnection() { } } + @Test + void sendCommitMessageWhenNotConnectionShouldSendHandshake() { + when(connectionManager.getPeerInfo(peerId)).thenReturn(peerInfo); + when(peerInfo.getGrandpaStreams()).thenReturn(protocolStreams); + when(protocolStreams.getInitiator()).thenReturn(null); + when(host.getAddressBook()).thenReturn(addressBook); + when(protocol.dialPeer(host, peerId, addressBook)).thenReturn(grandpaController); + + grandpaService.sendCommitMessage(host, peerId, encodedCommitMessage); + + verify(grandpaController).sendHandshake(); + } + + @Test + void sendCommitMessageWhenExistingConnection() { + when(connectionManager.getPeerInfo(peerId)).thenReturn(peerInfo); + when(peerInfo.getGrandpaStreams()).thenReturn(protocolStreams); + when(protocolStreams.getInitiator()).thenReturn(stream); + + try (MockedConstruction mock = mockConstruction(GrandpaController.class)) { + grandpaService.sendCommitMessage(host, peerId, encodedCommitMessage); + + assertEquals(1, mock.constructed().size()); + GrandpaController controller = mock.constructed().get(0); + verify(controller).sendCommitMessage(encodedCommitMessage); + } + } + // Setting private fields. Not a good idea in general. // Necessary due to mockito's newer versions not being able to inject generic type fields in superclass private void setPrivateFieldOfSuperclass(Object object, String fieldName, Object value)