From b9e826b823a089c2164aea70d7d18f3c3418f612 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9Cur=D0=B0d=20H=D0=B0mz=D0=B0?= Date: Mon, 26 Aug 2024 15:43:58 +0300 Subject: [PATCH] feat: working warp sync --- .../chain/lightsyncstate/AuthoritySet.java | 2 + .../chain/lightsyncstate/EpochChanges.java | 2 + .../chain/lightsyncstate/LightSyncState.java | 20 +- .../java/com/limechain/network/Network.java | 2 - .../network/protocol/warp/WarpSync.java | 18 +- .../protocol/warp/WarpSyncProtocol.java | 65 +---- .../protocol/warp/WarpSyncService.java | 2 +- .../protocol/warp/dto/BlockHeader.java | 2 + .../protocol/warp/dto/Justification.java | 2 + .../protocol/warp/dto/WarpSyncFragment.java | 2 + .../protocol/warp/dto/WarpSyncResponse.java | 2 + .../warp/scale/reader/BlockHeaderReader.java | 6 - .../limechain/storage/block/SyncState.java | 2 +- .../limechain/sync/JustificationVerifier.java | 248 +++++++++++------- .../sync/warpsync/WarpSyncMachine.java | 18 +- .../sync/warpsync/WarpSyncState.java | 99 +++---- .../action/RequestFragmentsAction.java | 37 +-- .../action/VerifyJustificationAction.java | 87 +++--- .../java/com/limechain/teavm/HttpRequest.java | 7 +- .../com/limechain/teavm/TeaVMCallback.java | 10 + 20 files changed, 324 insertions(+), 309 deletions(-) create mode 100644 src/main/java/com/limechain/teavm/TeaVMCallback.java diff --git a/src/main/java/com/limechain/chain/lightsyncstate/AuthoritySet.java b/src/main/java/com/limechain/chain/lightsyncstate/AuthoritySet.java index dd9e9a929..be1a2807c 100644 --- a/src/main/java/com/limechain/chain/lightsyncstate/AuthoritySet.java +++ b/src/main/java/com/limechain/chain/lightsyncstate/AuthoritySet.java @@ -4,11 +4,13 @@ import lombok.Getter; import lombok.Setter; import com.limechain.tuple.Pair; +import lombok.ToString; import java.math.BigInteger; @Getter @Setter +@ToString public class AuthoritySet { private Authority[] currentAuthorities; private BigInteger setId; diff --git a/src/main/java/com/limechain/chain/lightsyncstate/EpochChanges.java b/src/main/java/com/limechain/chain/lightsyncstate/EpochChanges.java index 86ed948e4..94733c24f 100644 --- a/src/main/java/com/limechain/chain/lightsyncstate/EpochChanges.java +++ b/src/main/java/com/limechain/chain/lightsyncstate/EpochChanges.java @@ -4,12 +4,14 @@ import lombok.Getter; import lombok.Setter; import com.limechain.tuple.Pair; +import lombok.ToString; import java.math.BigInteger; import java.util.Map; @Getter @Setter +@ToString public class EpochChanges { private ForkTree inner; diff --git a/src/main/java/com/limechain/chain/lightsyncstate/LightSyncState.java b/src/main/java/com/limechain/chain/lightsyncstate/LightSyncState.java index 4d39b6df4..fa20d171a 100644 --- a/src/main/java/com/limechain/chain/lightsyncstate/LightSyncState.java +++ b/src/main/java/com/limechain/chain/lightsyncstate/LightSyncState.java @@ -13,15 +13,16 @@ import java.util.Map; @Getter +@ToString public class LightSyncState { private BlockHeader finalizedBlockHeader; private EpochChanges epochChanges; private AuthoritySet grandpaAuthoritySet; - public static LightSyncState decode(Map lightSyncState) { - String header = lightSyncState.get("finalizedBlockHeader"); - String epochChanges = lightSyncState.get("babeEpochChanges"); - String grandpaAuthoritySet = lightSyncState.get("grandpaAuthoritySet"); + public static LightSyncState decode(Map lightSyncStateMap) { + String header = lightSyncStateMap.get("finalizedBlockHeader"); + String epochChanges = lightSyncStateMap.get("babeEpochChanges"); + String grandpaAuthoritySet = lightSyncStateMap.get("grandpaAuthoritySet"); if (header == null) { throw new IllegalStateException("finalizedBlockHeader is null"); @@ -34,18 +35,19 @@ public static LightSyncState decode(Map lightSyncState) { } - var state = new LightSyncState(); + LightSyncState lightSyncState = new LightSyncState(); byte[] bytes = StringUtils.hexToBytes(header); - state.finalizedBlockHeader = new BlockHeaderReader() + lightSyncState.finalizedBlockHeader = new BlockHeaderReader() .read(new ScaleCodecReader(bytes)); byte[] bytes1 = StringUtils.hexToBytes(epochChanges); - state.epochChanges = new EpochChangesReader() + lightSyncState.epochChanges = new EpochChangesReader() .read(new ScaleCodecReader(bytes1)); - state.grandpaAuthoritySet = new AuthoritySetReader() + lightSyncState.grandpaAuthoritySet = new AuthoritySetReader() .read(new ScaleCodecReader(StringUtils.hexToBytes(grandpaAuthoritySet))); - return state; + System.out.println(lightSyncState); + return lightSyncState; } } diff --git a/src/main/java/com/limechain/network/Network.java b/src/main/java/com/limechain/network/Network.java index 2c78cf58d..416cf184f 100644 --- a/src/main/java/com/limechain/network/Network.java +++ b/src/main/java/com/limechain/network/Network.java @@ -209,8 +209,6 @@ public void pingPeers() { // } // public WarpSyncResponse makeWarpSyncRequest(String blockHash) { -// if (isPeerInvalid()) return null; - return this.warpSyncService.getProtocol().warpSyncRequest( blockHash); } diff --git a/src/main/java/com/limechain/network/protocol/warp/WarpSync.java b/src/main/java/com/limechain/network/protocol/warp/WarpSync.java index 5e6075a56..71940361a 100644 --- a/src/main/java/com/limechain/network/protocol/warp/WarpSync.java +++ b/src/main/java/com/limechain/network/protocol/warp/WarpSync.java @@ -20,25 +20,15 @@ public class WarpSync extends StrictProtocolBinding { private String protocolId; - public WarpSync(String protocolId, WarpSyncProtocol protocol) { - super(protocolId/*, protocol*/); + public WarpSync(String protocolId) { + super(protocolId); this.protocolId = protocolId; } - public WarpSyncResponse warpSyncRequest(/*PeerId peer,*/ String blockHash) { -// try { -// Stream stream = dialPeer(/*peer*/); + public WarpSyncResponse warpSyncRequest(String blockHash) { WarpSyncProtocol.Sender sender = new WarpSyncProtocol.Sender(); - System.out.println("Block hash: " + blockHash); WarpSyncResponse resp = sender.warpSyncRequest(blockHash, protocolId); - log.log(Level.INFO, "Received warp sync response with " + resp/*.getFragments().length*/ + " fragments"); + log.log(Level.INFO, "Received warp sync response with " + resp.getFragments().length + " fragments"); return resp; -// } catch (ExecutionException | TimeoutException | IllegalStateException e) { -// log.log(Level.SEVERE, "Error while sending remote call request: ", e); -// throw new ExecutionFailedException(e); -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); -// throw new ThreadInterruptedException(e); -// } } } diff --git a/src/main/java/com/limechain/network/protocol/warp/WarpSyncProtocol.java b/src/main/java/com/limechain/network/protocol/warp/WarpSyncProtocol.java index 5c85ef564..4d202b741 100644 --- a/src/main/java/com/limechain/network/protocol/warp/WarpSyncProtocol.java +++ b/src/main/java/com/limechain/network/protocol/warp/WarpSyncProtocol.java @@ -3,73 +3,38 @@ import com.limechain.network.protocol.warp.dto.WarpSyncRequest; import com.limechain.network.protocol.warp.dto.WarpSyncResponse; import com.limechain.network.protocol.warp.scale.reader.WarpSyncResponseScaleReader; -import com.limechain.network.wrapper.Stream; import com.limechain.polkaj.reader.ScaleCodecReader; import com.limechain.utils.StringUtils; import org.teavm.jso.JSBody; -import org.teavm.jso.core.JSArray; import org.teavm.jso.core.JSPromise; +import org.teavm.jso.core.JSString; -import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceArray; -public class WarpSyncProtocol /*extends ProtocolHandler*/ { - // Sizes taken from smoldot - public static final int MAX_REQUEST_SIZE = 32; - public static final int MAX_RESPONSE_SIZE = 16 * 1024 * 1024; +public class WarpSyncProtocol { public WarpSyncProtocol() { -// super(MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE); } - /*@Override - protected CompletableFuture onStartInitiator(Stream stream) { - stream.pushHandler(new Leb128LengthFrameDecoder()); - stream.pushHandler(new WarpSyncResponseDecoder()); - - stream.pushHandler(new Leb128LengthFrameEncoder()); - stream.pushHandler(new ByteArrayEncoder()); - WarpSyncProtocol.Sender handler = new WarpSyncProtocol.Sender(stream); - stream.pushHandler(handler); - return CompletableFuture.completedFuture(handler); - }*/ - static class Sender implements WarpSyncController { - public static final int MAX_QUEUE_SIZE = 50; -// private final LinkedBlockingDeque> queue = -// new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); public Sender() { } - // @Override - public void onMessage(Stream stream, WarpSyncResponse msg) { -// Objects.requireNonNull(queue.poll()).complete(msg); -// stream.closeWrite(); - } - @Override public WarpSyncResponse send(WarpSyncRequest req, String protocolId) { - System.out.println("Request: " + req.getBlockHash()); final var lock = new Object(); - JSPromise objectJSPromise = + AtomicReference response = new AtomicReference<>(); + JSPromise objectJSPromise = sendRequest(StringUtils.toHex(req.getBlockHash().getBytes()), protocolId); objectJSPromise.then((ttt) -> { - System.out.println("Received response: " + ttt); - System.out.println("Received response len: " + ttt.length()); - byte[] bytes = StringUtils.fromHex(ttt); - System.out.println("Received response: " + bytes); - System.out.println("Received response len: " + bytes.length); - synchronized (lock) { -// System.out.println("Received response: " + bytes.length + " " + bytes); - ScaleCodecReader scaleCodecReader = new ScaleCodecReader(bytes); - WarpSyncResponse responseaa = new WarpSyncResponseScaleReader().read(scaleCodecReader); - System.out.println(responseaa); -// response.set(result); + String str = ttt.stringValue(); + byte[] bytes = StringUtils.fromHex(str); + + response.set(bytes); lock.notify(); } return null; @@ -78,16 +43,14 @@ public WarpSyncResponse send(WarpSyncRequest req, String protocolId) { synchronized (lock) { try { lock.wait(); -// byte[] bytes = response.get(); -// System.out.println("Received response: " + /*bytes.length +*/ " " + bytes); -// ScaleCodecReader scaleCodecReader = new ScaleCodecReader(bytes); -// WarpSyncResponse responseaa = new WarpSyncResponseScaleReader().read(scaleCodecReader); -// System.out.println(responseaa); + byte[] bytes = response.get(); + ScaleCodecReader scaleCodecReader = new ScaleCodecReader(bytes); + + return new WarpSyncResponseScaleReader().read(scaleCodecReader); } catch (InterruptedException e) { throw new RuntimeException(e); } } - return null; } @JSBody(params = {"blockHash", "protocolId"}, script = "return (async () => {" + @@ -95,9 +58,9 @@ public WarpSyncResponse send(WarpSyncRequest req, String protocolId) { " let stream = await ItPbStream.pbStream(await libp.dialProtocol(peer, protocolId));" + " stream.writeLP(new Uint8Array([...blockHash.matchAll(/../g)].map(m => parseInt(m[0], 16))));" + " let bytes = (await stream.readLP()).subarray();" + - " return [...bytes].map(n => n.toString(16)).join('');" + + " return [...bytes].map(n => n.toString(16).padStart(2, '0')).join('');" + "})()") - private static native JSPromise sendRequest(String blockHash, String protocolId); + private static native JSPromise sendRequest(String blockHash, String protocolId); } } diff --git a/src/main/java/com/limechain/network/protocol/warp/WarpSyncService.java b/src/main/java/com/limechain/network/protocol/warp/WarpSyncService.java index ce14641b1..0c1c602de 100644 --- a/src/main/java/com/limechain/network/protocol/warp/WarpSyncService.java +++ b/src/main/java/com/limechain/network/protocol/warp/WarpSyncService.java @@ -5,7 +5,7 @@ public class WarpSyncService extends NetworkService { public WarpSyncService(String protocolId) { - this.protocol = new WarpSync(protocolId, new WarpSyncProtocol()); + this.protocol = new WarpSync(protocolId); } } diff --git a/src/main/java/com/limechain/network/protocol/warp/dto/BlockHeader.java b/src/main/java/com/limechain/network/protocol/warp/dto/BlockHeader.java index 46633387d..7e62c857b 100644 --- a/src/main/java/com/limechain/network/protocol/warp/dto/BlockHeader.java +++ b/src/main/java/com/limechain/network/protocol/warp/dto/BlockHeader.java @@ -5,12 +5,14 @@ import com.limechain.utils.scale.ScaleUtils; import lombok.Getter; import lombok.Setter; +import lombok.ToString; import java.math.BigInteger; import java.util.Arrays; @Setter @Getter +@ToString public class BlockHeader { // TODO: Make this const configurable public static final int BLOCK_NUMBER_SIZE = 4; diff --git a/src/main/java/com/limechain/network/protocol/warp/dto/Justification.java b/src/main/java/com/limechain/network/protocol/warp/dto/Justification.java index 2fdeeda65..c8f26c2af 100644 --- a/src/main/java/com/limechain/network/protocol/warp/dto/Justification.java +++ b/src/main/java/com/limechain/network/protocol/warp/dto/Justification.java @@ -3,6 +3,7 @@ import com.limechain.polkaj.Hash256; import lombok.Getter; import lombok.Setter; +import lombok.ToString; import lombok.extern.java.Log; import java.math.BigInteger; @@ -11,6 +12,7 @@ @Setter @Getter @Log +@ToString public class Justification { private BigInteger round; private Hash256 targetHash; diff --git a/src/main/java/com/limechain/network/protocol/warp/dto/WarpSyncFragment.java b/src/main/java/com/limechain/network/protocol/warp/dto/WarpSyncFragment.java index 829ce51ca..f6276ab2b 100644 --- a/src/main/java/com/limechain/network/protocol/warp/dto/WarpSyncFragment.java +++ b/src/main/java/com/limechain/network/protocol/warp/dto/WarpSyncFragment.java @@ -2,6 +2,7 @@ import lombok.Getter; import lombok.Setter; +import lombok.ToString; /** * Each fragment represents a change in the list of Grandpa authorities, and a list of signatures of @@ -9,6 +10,7 @@ */ @Getter @Setter +@ToString public class WarpSyncFragment { private BlockHeader header; private Justification justification; diff --git a/src/main/java/com/limechain/network/protocol/warp/dto/WarpSyncResponse.java b/src/main/java/com/limechain/network/protocol/warp/dto/WarpSyncResponse.java index fad80e347..51033db12 100644 --- a/src/main/java/com/limechain/network/protocol/warp/dto/WarpSyncResponse.java +++ b/src/main/java/com/limechain/network/protocol/warp/dto/WarpSyncResponse.java @@ -2,11 +2,13 @@ import lombok.Getter; import lombok.Setter; +import lombok.ToString; import java.util.Arrays; @Getter @Setter +@ToString public class WarpSyncResponse { private WarpSyncFragment[] fragments; private boolean isFinished; diff --git a/src/main/java/com/limechain/network/protocol/warp/scale/reader/BlockHeaderReader.java b/src/main/java/com/limechain/network/protocol/warp/scale/reader/BlockHeaderReader.java index 0c9974ea8..b40df4c4a 100644 --- a/src/main/java/com/limechain/network/protocol/warp/scale/reader/BlockHeaderReader.java +++ b/src/main/java/com/limechain/network/protocol/warp/scale/reader/BlockHeaderReader.java @@ -12,17 +12,12 @@ public class BlockHeaderReader implements ScaleReader { @Override public BlockHeader read(ScaleCodecReader reader) { BlockHeader blockHeader = new BlockHeader(); - System.out.println("BlockHeaderReader.read"); blockHeader.setParentHash(new Hash256(reader.readUint256())); - System.out.println("BlockHeaderReader.setParentHash"); // NOTE: Usage of BlockNumberReader is intentionally omitted here, // since we want this to be a compact int, not a var size int blockHeader.setBlockNumber(BigInteger.valueOf(reader.readCompactInt())); - System.out.println("BlockHeaderReader.setBlockNumber"); blockHeader.setStateRoot(new Hash256(reader.readUint256())); - System.out.println("BlockHeaderReader.setStateRoot"); blockHeader.setExtrinsicsRoot(new Hash256(reader.readUint256())); - System.out.println("BlockHeaderReader.setExtrinsicsRoot"); var digestCount = reader.readCompactInt(); HeaderDigest[] digests = new HeaderDigest[digestCount]; @@ -31,7 +26,6 @@ public BlockHeader read(ScaleCodecReader reader) { } blockHeader.setDigest(digests); - System.out.println("BlockHeaderReader.setDigest"); return blockHeader; } diff --git a/src/main/java/com/limechain/storage/block/SyncState.java b/src/main/java/com/limechain/storage/block/SyncState.java index 7022d3536..a8367d786 100644 --- a/src/main/java/com/limechain/storage/block/SyncState.java +++ b/src/main/java/com/limechain/storage/block/SyncState.java @@ -80,7 +80,7 @@ public void resetRound() { public void setLightSyncState(LightSyncState initState) { this.setId = initState.getGrandpaAuthoritySet().getSetId(); setAuthoritySet(initState.getGrandpaAuthoritySet().getCurrentAuthorities()); -// finalizeHeader(initState.getFinalizedBlockHeader()); + finalizeHeader(initState.getFinalizedBlockHeader()); } public String getStateRoot() { diff --git a/src/main/java/com/limechain/sync/JustificationVerifier.java b/src/main/java/com/limechain/sync/JustificationVerifier.java index f29f3a5eb..e790dac44 100644 --- a/src/main/java/com/limechain/sync/JustificationVerifier.java +++ b/src/main/java/com/limechain/sync/JustificationVerifier.java @@ -1,110 +1,158 @@ package com.limechain.sync; +import com.limechain.chain.lightsyncstate.Authority; +import com.limechain.network.protocol.warp.dto.Precommit; +import com.limechain.polkaj.Hash256; +import com.limechain.polkaj.Hash512; +import com.limechain.rpc.server.AppBean; +import com.limechain.storage.block.SyncState; +import com.limechain.utils.LittleEndianUtils; +import com.limechain.utils.StringUtils; import lombok.AccessLevel; import lombok.NoArgsConstructor; import lombok.extern.java.Log; +import org.teavm.jso.JSBody; +import org.teavm.jso.core.JSBoolean; +import org.teavm.jso.core.JSPromise; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.stream.Collectors; @Log @NoArgsConstructor(access = AccessLevel.PRIVATE) public class JustificationVerifier { -// public static boolean verify(Precommit[] precommits, BigInteger round) { -// SyncState syncState = AppBean.getBean(SyncState.class); -// Authority[] authorities = syncState.getAuthoritySet(); -// BigInteger authoritiesSetId = syncState.getSetId(); -// -// // Implementation from: https://github.com/smol-dot/smoldot -// // lib/src/finality/justification/verify.rs -// if (authorities == null || precommits.length < (authorities.length * 2 / 3) + 1) { -// log.log(Level.WARNING, "Not enough signatures"); -// return false; -// } -// -// Set seenPublicKeys = new HashSet<>(); -// Set authorityKeys = Arrays.stream(authorities) -// .map(Authority::getPublicKey) -// .map(Hash256::new) -// .collect(Collectors.toSet()); -// -// for (Precommit precommit : precommits) { -// if (!authorityKeys.contains(precommit.getAuthorityPublicKey())) { -// log.log(Level.WARNING, "Invalid Authority for precommit"); -// return false; -// } -// -// if (seenPublicKeys.contains(precommit.getAuthorityPublicKey())) { -// log.log(Level.WARNING, "Duplicated signature"); -// return false; -// } -// seenPublicKeys.add(precommit.getAuthorityPublicKey()); -// -// // TODO (from smoldot): must check signed block ancestry using `votes_ancestries` -// -// byte[] data = getDataToVerify(precommit, authoritiesSetId, round); -// -// boolean isValid = verifySignature(precommit.getAuthorityPublicKey().toString(), -// precommit.getSignature().toString(), data); -// if (!isValid) { -// log.log(Level.WARNING, "Failed to verify signature"); -// return false; -// } -// } -// log.log(Level.INFO, "All signatures were verified successfully"); -// -// // From Smoldot implementation: -// // TODO: must check that votes_ancestries doesn't contain any unused entry -// // TODO: there's also a "ghost" thing? -// -// return true; -// } -// -// private static byte[] getDataToVerify(Precommit precommit, BigInteger authoritiesSetId, BigInteger round){ -// // 1 reserved byte for data type -// // 32 reserved for target hash -// // 4 reserved for block number -// // 8 reserved for justification round -// // 8 reserved for set id -// int messageCapacity = 1 + 32 + 4 + 8 + 8; -// var messageBuffer = ByteBuffer.allocate(messageCapacity); -// messageBuffer.order(ByteOrder.LITTLE_ENDIAN); -// -// // Write message type -// messageBuffer.put((byte) 1); -// // Write target hash -// messageBuffer.put(LittleEndianUtils -// .convertBytes(StringUtils.hexToBytes(precommit.getTargetHash().toString()))); -// //Write Justification round bytes as u64 -// messageBuffer.put(LittleEndianUtils -// .bytesToFixedLength(precommit.getTargetNumber().toByteArray(), 4)); -// //Write Justification round bytes as u64 -// messageBuffer.put(LittleEndianUtils.bytesToFixedLength(round.toByteArray(), 8)); -// //Write Set Id bytes as u64 -// messageBuffer.put(LittleEndianUtils.bytesToFixedLength(authoritiesSetId.toByteArray(), 8)); -// -// //Verify message -// //Might have problems because we use the stand ED25519 instead of ED25519_zebra -// messageBuffer.rewind(); -// byte[] data = new byte[messageBuffer.remaining()]; -// messageBuffer.get(data); -// return data; -// } -// -// public static boolean verifySignature(String publicKeyHex, String signatureHex, byte[] data) { -// byte[] publicKeyBytes = Hex.decode(publicKeyHex.substring(2)); -// byte[] signatureBytes = Hex.decode(signatureHex.substring(2)); -// Ed25519PublicKeyParameters publicKeyParams = new Ed25519PublicKeyParameters(publicKeyBytes, 0); -// Ed25519Signer verifier = new Ed25519Signer(); -// verifier.init(false, publicKeyParams); -// verifier.update(data, 0, data.length); -// -// Ed25519PublicKey publicKey = -// new Ed25519PublicKey(publicKeyParams); -// Extrinsic.ED25519Signature signature = new Extrinsic.ED25519Signature(Hash512.from(signatureHex)); -// -// boolean isValid = verifier.verifySignature(signatureBytes); -// boolean result = publicKey.verify(data, signature.getValue().getBytes()); -// if (!result) { -// log.log(Level.WARNING, "Invalid signature"); -// } -// return isValid; -// } + public static boolean verify(Precommit[] precommits, BigInteger round) { + SyncState syncState = AppBean.getBean(SyncState.class); + Authority[] authorities = syncState.getAuthoritySet(); + BigInteger authoritiesSetId = syncState.getSetId(); + + // Implementation from: https://github.com/smol-dot/smoldot + // lib/src/finality/justification/verify.rs + if (authorities == null || precommits.length < (authorities.length * 2 / 3) + 1) { + log.log(Level.WARNING, "Not enough signatures"); + return false; + } + + Set seenPublicKeys = new HashSet<>(); + Set authorityKeys = + Arrays.stream(authorities).map(Authority::getPublicKey).map(Hash256::new).collect(Collectors.toSet()); + + for (Precommit precommit : precommits) { + if (!authorityKeys.contains(precommit.getAuthorityPublicKey())) { + log.log(Level.WARNING, "Invalid Authority for precommit"); + return false; + } + + if (seenPublicKeys.contains(precommit.getAuthorityPublicKey())) { + log.log(Level.WARNING, "Duplicated signature"); + return false; + } + seenPublicKeys.add(precommit.getAuthorityPublicKey()); + + // TODO (from smoldot): must check signed block ancestry using `votes_ancestries` + + byte[] data = getDataToVerify(precommit, authoritiesSetId, round); + + boolean isValid = + verifySignature(precommit.getAuthorityPublicKey().toString(), precommit.getSignature().toString(), + data); + if (!isValid) { + log.log(Level.WARNING, "Failed to verify signature"); + return false; + } + } + log.log(Level.INFO, "All signatures were verified successfully"); + + // From Smoldot implementation: + // TODO: must check that votes_ancestries doesn't contain any unused entry + // TODO: there's also a "ghost" thing? + + return true; + } + + private static byte[] getDataToVerify(Precommit precommit, BigInteger authoritiesSetId, BigInteger round) { + // 1 reserved byte for data type + // 32 reserved for target hash + // 4 reserved for block number + // 8 reserved for justification round + // 8 reserved for set id + int messageCapacity = 1 + 32 + 4 + 8 + 8; + var messageBuffer = ByteBuffer.allocate(messageCapacity); + messageBuffer.order(ByteOrder.LITTLE_ENDIAN); + + // Write message type + messageBuffer.put((byte) 1); + // Write target hash + messageBuffer.put(LittleEndianUtils.convertBytes(StringUtils.hexToBytes(precommit.getTargetHash().toString()))); + //Write Justification round bytes as u64 + messageBuffer.put(LittleEndianUtils.bytesToFixedLength(precommit.getTargetNumber().toByteArray(), 4)); + //Write Justification round bytes as u64 + messageBuffer.put(LittleEndianUtils.bytesToFixedLength(round.toByteArray(), 8)); + //Write Set Id bytes as u64 + messageBuffer.put(LittleEndianUtils.bytesToFixedLength(authoritiesSetId.toByteArray(), 8)); + + //Verify message + //Might have problems because we use the stand ED25519 instead of ED25519_zebra + messageBuffer.rewind(); + byte[] data = new byte[messageBuffer.remaining()]; + messageBuffer.get(data); + return data; + } + + public static boolean verifySignature(String publicKeyHex, String signatureHex, byte[] data) { + String message = StringUtils.toHex(data); + AtomicBoolean verifier = new AtomicBoolean(false); + Object lock = new Object(); + + verifySignature(publicKeyHex, signatureHex, message).then(isValid -> { + synchronized (lock) { + verifier.set(isValid.booleanValue()); + lock.notify(); + } + return null; + }); + + synchronized (lock) { + try { + lock.wait(); + + boolean result = verifier.get(); + if (!result) { + log.log(Level.WARNING, "Invalid signature"); + } + return result; + } catch (InterruptedException e) { + log.log(Level.WARNING, "Interrupted while waiting for signature verification"); + return false; + } + } + } + + @JSBody(params = {"publicKeyHex", "signatureHex", "messageHex"}, + script = "return (async () => {" + + " const publicKeyBytes = new Uint8Array([...publicKeyHex.matchAll(/../g)].map(m => parseInt(m[0], 16)));" + + " const signatureBytes = new Uint8Array([...signatureHex.matchAll(/../g)].map(m => parseInt(m[0], 16)));" + + " const publicKey = await crypto.subtle.importKey(" + + " 'raw'," + " publicKeyBytes," + + " {" + " name: 'NODE-ED25519'," + + " namedCurve: 'ed25519'" + " }," + + " true," + " ['verify']" + " );" + + " const messageBytes = new Uint8Array([...messageHex.matchAll(/../g)].map(m => parseInt(m[0], 16)));;" + + " const isValid = await crypto.subtle.verify(" + + " {" + " name: 'NODE-ED25519'" + + " }," + " publicKey," + + " signatureBytes," + + " messageBytes" + " );" + + " return isValid;" + + + "})()") + public static native JSPromise verifySignature(String publicKeyHex, String signatureHex, + String messageHex); } diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java index f7b0a6ed2..bfaee3f46 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncMachine.java @@ -38,7 +38,8 @@ public class WarpSyncMachine { private final SyncState syncState; private final List onFinishCallbacks; - public WarpSyncMachine(Network network, ChainService chainService, SyncState syncState, WarpSyncState warpSyncState) { + public WarpSyncMachine(Network network, ChainService chainService, SyncState syncState, + WarpSyncState warpSyncState) { this.networkService = network; this.chainService = chainService; this.syncState = syncState; @@ -54,7 +55,6 @@ public void nextState() { } public void handleState() { - System.out.println("Warp sync action" + warpSyncAction.getClass().getSimpleName()); warpSyncAction.handle(this); } @@ -78,12 +78,12 @@ public void start() { this.warpSyncAction = new RequestFragmentsAction(initStateHash); // new Thread(() -> { -// while (this.warpSyncAction.getClass() != FinishedAction.class) { - this.handleState(); - this.nextState(); -// } -// -// finishWarpSync(); + while (this.warpSyncAction.getClass() != FinishedAction.class) { + this.handleState(); + this.nextState(); + } + + finishWarpSync(); // }).start(); } @@ -96,7 +96,7 @@ public void stop() { private void finishWarpSync() { this.warpState.setWarpSyncFinished(true); // this.networkService.handshakeBootNodes(); -// this.syncState.persistState(); + this.syncState.persistState(); log.info("Warp sync finished."); this.onFinishCallbacks.forEach(Runnable::run); } diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java index e1edf1bb6..f6bd896fb 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java @@ -2,7 +2,14 @@ import com.limechain.chain.lightsyncstate.Authority; import com.limechain.network.Network; +import com.limechain.network.protocol.warp.dto.ConsensusEngine; +import com.limechain.network.protocol.warp.dto.HeaderDigest; +import com.limechain.polkaj.reader.ScaleCodecReader; import com.limechain.storage.block.SyncState; +import com.limechain.sync.warpsync.dto.AuthoritySetChange; +import com.limechain.sync.warpsync.dto.GrandpaDigestMessageType; +import com.limechain.sync.warpsync.scale.ForcedChangeReader; +import com.limechain.sync.warpsync.scale.ScheduledChangeReader; import com.limechain.tuple.Pair; import lombok.Getter; import lombok.Setter; @@ -174,51 +181,51 @@ public void handleScheduledEvents() { * @param headerDigests digest of the block header * @param blockNumber block that contains the digest */ -// public void handleAuthorityChanges(HeaderDigest[] headerDigests, BigInteger blockNumber) { -// // Update authority set and set id -// AuthoritySetChange authorityChanges; -// for (HeaderDigest digest : headerDigests) { -// if (digest.getId() == ConsensusEngine.GRANDPA) { -// ScaleCodecReader reader = new ScaleCodecReader(digest.getMessage()); -// GrandpaDigestMessageType type = GrandpaDigestMessageType.fromId(reader.readByte()); -// -// if (type == null) { -// log.log(Level.SEVERE, "Could not get grandpa message type"); -// throw new IllegalStateException("Unknown grandpa message type"); -// } -// -// switch (type) { -// case SCHEDULED_CHANGE -> { -// ScheduledChangeReader authorityChangesReader = new ScheduledChangeReader(); -// authorityChanges = authorityChangesReader.read(reader); -// scheduledAuthorityChanges -// .add(new Pair<>(blockNumber.add(authorityChanges.getDelay()), -// authorityChanges.getAuthorities())); -// return; -// } -// case FORCED_CHANGE -> { -// ForcedChangeReader authorityForcedChangesReader = new ForcedChangeReader(); -// authorityChanges = authorityForcedChangesReader.read(reader); -// scheduledAuthorityChanges -// .add(new Pair<>(blockNumber.add(authorityChanges.getDelay()), -// authorityChanges.getAuthorities())); -// return; -// } -// case ON_DISABLED -> { -// log.log(Level.SEVERE, "'ON DISABLED' grandpa message not implemented"); -// return; -// } -// case PAUSE -> { -// log.log(Level.SEVERE, "'PAUSE' grandpa message not implemented"); -// return; -// } -// case RESUME -> { -// log.log(Level.SEVERE, "'RESUME' grandpa message not implemented"); -// return; -// } -// } -// } -// } -// } + public void handleAuthorityChanges(HeaderDigest[] headerDigests, BigInteger blockNumber) { + // Update authority set and set id + AuthoritySetChange authorityChanges; + for (HeaderDigest digest : headerDigests) { + if (digest.getId() == ConsensusEngine.GRANDPA) { + ScaleCodecReader reader = new ScaleCodecReader(digest.getMessage()); + GrandpaDigestMessageType type = GrandpaDigestMessageType.fromId(reader.readByte()); + + if (type == null) { + log.log(Level.SEVERE, "Could not get grandpa message type"); + throw new IllegalStateException("Unknown grandpa message type"); + } + + switch (type) { + case SCHEDULED_CHANGE -> { + ScheduledChangeReader authorityChangesReader = new ScheduledChangeReader(); + authorityChanges = authorityChangesReader.read(reader); + scheduledAuthorityChanges + .add(new Pair<>(blockNumber.add(authorityChanges.getDelay()), + authorityChanges.getAuthorities())); + return; + } + case FORCED_CHANGE -> { + ForcedChangeReader authorityForcedChangesReader = new ForcedChangeReader(); + authorityChanges = authorityForcedChangesReader.read(reader); + scheduledAuthorityChanges + .add(new Pair<>(blockNumber.add(authorityChanges.getDelay()), + authorityChanges.getAuthorities())); + return; + } + case ON_DISABLED -> { + log.log(Level.SEVERE, "'ON DISABLED' grandpa message not implemented"); + return; + } + case PAUSE -> { + log.log(Level.SEVERE, "'PAUSE' grandpa message not implemented"); + return; + } + case RESUME -> { + log.log(Level.SEVERE, "'RESUME' grandpa message not implemented"); + return; + } + } + } + } + } } \ No newline at end of file diff --git a/src/main/java/com/limechain/sync/warpsync/action/RequestFragmentsAction.java b/src/main/java/com/limechain/sync/warpsync/action/RequestFragmentsAction.java index 671f035dc..f5df404e3 100644 --- a/src/main/java/com/limechain/sync/warpsync/action/RequestFragmentsAction.java +++ b/src/main/java/com/limechain/sync/warpsync/action/RequestFragmentsAction.java @@ -8,6 +8,9 @@ import com.limechain.sync.warpsync.WarpSyncState; import lombok.extern.java.Log; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; @Log @@ -15,7 +18,7 @@ public class RequestFragmentsAction implements WarpSyncAction { private final WarpSyncState warpSyncState; private final Hash256 blockHash; - // private WarpSyncResponse result; + private WarpSyncResponse result; private Exception error; public RequestFragmentsAction(Hash256 blockHash) { @@ -38,44 +41,32 @@ public void next(WarpSyncMachine sync) { + e.getMessage(), e.getStackTrace()); } } -// if (this.result != null) { - sync.setWarpSyncAction(new VerifyJustificationAction()); -// return; -// } + if (this.result != null) { + sync.setWarpSyncAction(new VerifyJustificationAction()); + return; + } log.log(Level.WARNING, "RequestFragmentsState.next() called without result or error set."); } @Override public void handle(WarpSyncMachine sync) { - WarpSyncResponse resp = null; -// for (int i = 0; i < sync.getNetworkService().getKademliaService().getBootNodePeerIds().size(); i++) { -// try { - resp = sync.getNetworkService().makeWarpSyncRequest(blockHash.toString()); -// break; -// } catch (Exception e) { -// if (!sync.getNetworkService().updateCurrentSelectedPeerWithBootnode(i)) { -// this.error = e; -// return; -// } -// } -// } + WarpSyncResponse resp = sync.getNetworkService().makeWarpSyncRequest(blockHash.toString()); try { if (resp == null) { throw new MissingObjectException("No response received."); } - log.log(Level.INFO, "Successfully received fragments from peer " - /* + sync.getNetworkService().getCurrentSelectedPeer()*/); + log.log(Level.INFO, "Successfully received fragments from peer"); if (resp.getFragments().length == 0) { log.log(Level.WARNING, "No fragments received."); return; } warpSyncState.setWarpSyncFragmentsFinished(resp.isFinished()); -// sync.setFragmentsQueue(new LinkedBlockingQueue<>( -// Arrays.stream(resp.getFragments()).toList()) -// ); + sync.setFragmentsQueue(new ArrayDeque<>( + Arrays.stream(resp.getFragments()).toList()) + ); -// this.result = resp; + this.result = resp; } catch (Exception e) { // TODO: Set error state, next() will use to transition to correct next state. // This error state could be either recoverable or irrecoverable. diff --git a/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java b/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java index 354b2af3a..7c07f4e4f 100644 --- a/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java +++ b/src/main/java/com/limechain/sync/warpsync/action/VerifyJustificationAction.java @@ -1,11 +1,16 @@ package com.limechain.sync.warpsync.action; +import com.limechain.exception.sync.JustificationVerificationException; +import com.limechain.network.protocol.warp.dto.WarpSyncFragment; import com.limechain.rpc.server.AppBean; import com.limechain.storage.block.SyncState; +import com.limechain.sync.JustificationVerifier; import com.limechain.sync.warpsync.WarpSyncMachine; import com.limechain.sync.warpsync.WarpSyncState; import lombok.extern.java.Log; +import java.util.logging.Level; + // VerifyJustificationState is going to be instantiated a lot of times // Maybe we can make it a singleton in order to reduce performance overhead? @Log @@ -27,51 +32,51 @@ public void next(WarpSyncMachine sync) { return; } -// if (!sync.getFragmentsQueue().isEmpty()) { -// sync.setWarpSyncAction(new VerifyJustificationAction()); -// } else if (warpSyncState.isWarpSyncFragmentsFinished()) { -// sync.setWarpSyncAction(new RuntimeDownloadAction()); -// } else { -// sync.setWarpSyncAction(new RequestFragmentsAction(syncState.getLastFinalizedBlockHash())); -// } + if (!sync.getFragmentsQueue().isEmpty()) { + sync.setWarpSyncAction(new VerifyJustificationAction()); + } else if (warpSyncState.isWarpSyncFragmentsFinished()) { + sync.setWarpSyncAction(new FinishedAction()); + } else { + sync.setWarpSyncAction(new RequestFragmentsAction(syncState.getLastFinalizedBlockHash())); + } } @Override public void handle(WarpSyncMachine sync) { -// try { -// warpSyncState.handleScheduledEvents(); -// -// WarpSyncFragment fragment = sync.getFragmentsQueue().poll(); -// log.log(Level.INFO, "Verifying justification..."); -// if (fragment == null) { -// throw new JustificationVerificationException("No such fragment"); -// } -// boolean verified = JustificationVerifier.verify( -// fragment.getJustification().getPrecommits(), -// fragment.getJustification().getRound()); -// if (!verified) { -// throw new JustificationVerificationException("Justification could not be verified."); -// } -// -// syncState.finalizeHeader(fragment.getHeader()); -// handleAuthorityChanges(fragment); -// } catch (Exception e) { -// log.log(Level.WARNING, "Error while verifying justification: " + e.getMessage()); -// this.error = e; -// } + try { + warpSyncState.handleScheduledEvents(); + + WarpSyncFragment fragment = sync.getFragmentsQueue().poll(); + log.log(Level.INFO, "Verifying justification..."); + if (fragment == null) { + throw new JustificationVerificationException("No such fragment"); + } + boolean verified = JustificationVerifier.verify( + fragment.getJustification().getPrecommits(), + fragment.getJustification().getRound()); + if (!verified) { + throw new JustificationVerificationException("Justification could not be verified."); + } + + syncState.finalizeHeader(fragment.getHeader()); + handleAuthorityChanges(fragment); + } catch (Exception e) { + log.log(Level.WARNING, "Error while verifying justification: " + e.getMessage()); + this.error = e; + } } -// private void handleAuthorityChanges(WarpSyncFragment fragment) { -// try { -// warpSyncState.handleAuthorityChanges( -// fragment.getHeader().getDigest(), -// fragment.getJustification().getTargetBlock()); -// log.log(Level.INFO, "Verified justification. Block hash is now at #" -// + syncState.getLastFinalizedBlockNumber() + ": " -// + syncState.getLastFinalizedBlockHash().toString() -// + " with state root " + syncState.getStateRoot()); -// } catch (Exception e) { -// this.error = e; -// } -// } + private void handleAuthorityChanges(WarpSyncFragment fragment) { + try { + warpSyncState.handleAuthorityChanges( + fragment.getHeader().getDigest(), + fragment.getJustification().getTargetBlock()); + log.log(Level.INFO, "Verified justification. Block hash is now at #" + + syncState.getLastFinalizedBlockNumber() + ": " + + syncState.getLastFinalizedBlockHash().toString() + + " with state root " + syncState.getStateRoot()); + } catch (Exception e) { + this.error = e; + } + } } diff --git a/src/main/java/com/limechain/teavm/HttpRequest.java b/src/main/java/com/limechain/teavm/HttpRequest.java index a8c51b2b0..4c7b984df 100644 --- a/src/main/java/com/limechain/teavm/HttpRequest.java +++ b/src/main/java/com/limechain/teavm/HttpRequest.java @@ -27,10 +27,5 @@ private static void asyncHttpRequest(String method, String url, String body, Asy } @JSBody(params = {"method", "url", "body", "callback"}, script = "return asyncHttpRequest(method, url, body, callback);") - public static native void createAsyncHttpRequest(String method, String url, String body, HttpRequestCallback callback); - - @JSFunctor - private interface HttpRequestCallback extends JSObject { - void apply(JSError error, String response); - } + public static native void createAsyncHttpRequest(String method, String url, String body, TeaVMCallback callback); } diff --git a/src/main/java/com/limechain/teavm/TeaVMCallback.java b/src/main/java/com/limechain/teavm/TeaVMCallback.java new file mode 100644 index 000000000..31d018a0f --- /dev/null +++ b/src/main/java/com/limechain/teavm/TeaVMCallback.java @@ -0,0 +1,10 @@ +package com.limechain.teavm; + +import org.teavm.jso.JSFunctor; +import org.teavm.jso.JSObject; +import org.teavm.jso.core.JSError; + +@JSFunctor +public interface TeaVMCallback extends JSObject { + void apply(JSError error, String response); +}