Skip to content

Commit

Permalink
feat: working warp sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ablax committed Aug 26, 2024
1 parent 6cdded8 commit b9e826b
Show file tree
Hide file tree
Showing 20 changed files with 324 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import lombok.Getter;
import lombok.Setter;
import com.limechain.tuple.Pair;
import lombok.ToString;

import java.math.BigInteger;

@Getter
@Setter
@ToString
public class AuthoritySet {
private Authority[] currentAuthorities;
private BigInteger setId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
import lombok.Getter;
import lombok.Setter;
import com.limechain.tuple.Pair;
import lombok.ToString;

import java.math.BigInteger;
import java.util.Map;

@Getter
@Setter
@ToString
public class EpochChanges {
private ForkTree<PersistedEpochHeader> inner;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
import java.util.Map;

@Getter
@ToString
public class LightSyncState {
private BlockHeader finalizedBlockHeader;
private EpochChanges epochChanges;
private AuthoritySet grandpaAuthoritySet;

public static LightSyncState decode(Map<String, String> lightSyncState) {
String header = lightSyncState.get("finalizedBlockHeader");
String epochChanges = lightSyncState.get("babeEpochChanges");
String grandpaAuthoritySet = lightSyncState.get("grandpaAuthoritySet");
public static LightSyncState decode(Map<String, String> lightSyncStateMap) {
String header = lightSyncStateMap.get("finalizedBlockHeader");
String epochChanges = lightSyncStateMap.get("babeEpochChanges");
String grandpaAuthoritySet = lightSyncStateMap.get("grandpaAuthoritySet");

if (header == null) {
throw new IllegalStateException("finalizedBlockHeader is null");
Expand All @@ -34,18 +35,19 @@ public static LightSyncState decode(Map<String, String> lightSyncState) {
}


var state = new LightSyncState();
LightSyncState lightSyncState = new LightSyncState();
byte[] bytes = StringUtils.hexToBytes(header);
state.finalizedBlockHeader = new BlockHeaderReader()
lightSyncState.finalizedBlockHeader = new BlockHeaderReader()
.read(new ScaleCodecReader(bytes));

byte[] bytes1 = StringUtils.hexToBytes(epochChanges);
state.epochChanges = new EpochChangesReader()
lightSyncState.epochChanges = new EpochChangesReader()
.read(new ScaleCodecReader(bytes1));

state.grandpaAuthoritySet = new AuthoritySetReader()
lightSyncState.grandpaAuthoritySet = new AuthoritySetReader()
.read(new ScaleCodecReader(StringUtils.hexToBytes(grandpaAuthoritySet)));

return state;
System.out.println(lightSyncState);
return lightSyncState;
}
}
2 changes: 0 additions & 2 deletions src/main/java/com/limechain/network/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ public void pingPeers() {
// }
//
public WarpSyncResponse makeWarpSyncRequest(String blockHash) {
// if (isPeerInvalid()) return null;

return this.warpSyncService.getProtocol().warpSyncRequest(
blockHash);
}
Expand Down
18 changes: 4 additions & 14 deletions src/main/java/com/limechain/network/protocol/warp/WarpSync.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,15 @@ public class WarpSync extends StrictProtocolBinding {

private String protocolId;

public WarpSync(String protocolId, WarpSyncProtocol protocol) {
super(protocolId/*, protocol*/);
public WarpSync(String protocolId) {
super(protocolId);
this.protocolId = protocolId;
}

public WarpSyncResponse warpSyncRequest(/*PeerId peer,*/ String blockHash) {
// try {
// Stream stream = dialPeer(/*peer*/);
public WarpSyncResponse warpSyncRequest(String blockHash) {
WarpSyncProtocol.Sender sender = new WarpSyncProtocol.Sender();
System.out.println("Block hash: " + blockHash);
WarpSyncResponse resp = sender.warpSyncRequest(blockHash, protocolId);
log.log(Level.INFO, "Received warp sync response with " + resp/*.getFragments().length*/ + " fragments");
log.log(Level.INFO, "Received warp sync response with " + resp.getFragments().length + " fragments");
return resp;
// } catch (ExecutionException | TimeoutException | IllegalStateException e) {
// log.log(Level.SEVERE, "Error while sending remote call request: ", e);
// throw new ExecutionFailedException(e);
// } catch (InterruptedException e) {
// Thread.currentThread().interrupt();
// throw new ThreadInterruptedException(e);
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,73 +3,38 @@
import com.limechain.network.protocol.warp.dto.WarpSyncRequest;
import com.limechain.network.protocol.warp.dto.WarpSyncResponse;
import com.limechain.network.protocol.warp.scale.reader.WarpSyncResponseScaleReader;
import com.limechain.network.wrapper.Stream;
import com.limechain.polkaj.reader.ScaleCodecReader;
import com.limechain.utils.StringUtils;
import org.teavm.jso.JSBody;
import org.teavm.jso.core.JSArray;
import org.teavm.jso.core.JSPromise;
import org.teavm.jso.core.JSString;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class WarpSyncProtocol /*extends ProtocolHandler<WarpSyncController>*/ {
// Sizes taken from smoldot
public static final int MAX_REQUEST_SIZE = 32;
public static final int MAX_RESPONSE_SIZE = 16 * 1024 * 1024;
public class WarpSyncProtocol {

public WarpSyncProtocol() {
// super(MAX_REQUEST_SIZE, MAX_RESPONSE_SIZE);
}

/*@Override
protected CompletableFuture<WarpSyncController> onStartInitiator(Stream stream) {
stream.pushHandler(new Leb128LengthFrameDecoder());
stream.pushHandler(new WarpSyncResponseDecoder());
stream.pushHandler(new Leb128LengthFrameEncoder());
stream.pushHandler(new ByteArrayEncoder());
WarpSyncProtocol.Sender handler = new WarpSyncProtocol.Sender(stream);
stream.pushHandler(handler);
return CompletableFuture.completedFuture(handler);
}*/

static class Sender implements WarpSyncController {
public static final int MAX_QUEUE_SIZE = 50;
// private final LinkedBlockingDeque<CompletableFuture<WarpSyncResponse>> queue =
// new LinkedBlockingDeque<>(MAX_QUEUE_SIZE);

public Sender() {
}

// @Override
public void onMessage(Stream stream, WarpSyncResponse msg) {
// Objects.requireNonNull(queue.poll()).complete(msg);
// stream.closeWrite();
}

@Override
public WarpSyncResponse send(WarpSyncRequest req, String protocolId) {
System.out.println("Request: " + req.getBlockHash());
final var lock = new Object();

JSPromise<String> objectJSPromise =
AtomicReference<byte[]> response = new AtomicReference<>();
JSPromise<JSString> objectJSPromise =
sendRequest(StringUtils.toHex(req.getBlockHash().getBytes()), protocolId);

objectJSPromise.then((ttt) -> {
System.out.println("Received response: " + ttt);
System.out.println("Received response len: " + ttt.length());
byte[] bytes = StringUtils.fromHex(ttt);
System.out.println("Received response: " + bytes);
System.out.println("Received response len: " + bytes.length);

synchronized (lock) {
// System.out.println("Received response: " + bytes.length + " " + bytes);
ScaleCodecReader scaleCodecReader = new ScaleCodecReader(bytes);
WarpSyncResponse responseaa = new WarpSyncResponseScaleReader().read(scaleCodecReader);
System.out.println(responseaa);
// response.set(result);
String str = ttt.stringValue();
byte[] bytes = StringUtils.fromHex(str);

response.set(bytes);
lock.notify();
}
return null;
Expand All @@ -78,26 +43,24 @@ public WarpSyncResponse send(WarpSyncRequest req, String protocolId) {
synchronized (lock) {
try {
lock.wait();
// byte[] bytes = response.get();
// System.out.println("Received response: " + /*bytes.length +*/ " " + bytes);
// ScaleCodecReader scaleCodecReader = new ScaleCodecReader(bytes);
// WarpSyncResponse responseaa = new WarpSyncResponseScaleReader().read(scaleCodecReader);
// System.out.println(responseaa);
byte[] bytes = response.get();
ScaleCodecReader scaleCodecReader = new ScaleCodecReader(bytes);

return new WarpSyncResponseScaleReader().read(scaleCodecReader);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return null;
}

@JSBody(params = {"blockHash", "protocolId"}, script = "return (async () => {" +
" let peer = libp.getConnections()[0].remotePeer;" +
" let stream = await ItPbStream.pbStream(await libp.dialProtocol(peer, protocolId));" +
" stream.writeLP(new Uint8Array([...blockHash.matchAll(/../g)].map(m => parseInt(m[0], 16))));" +
" let bytes = (await stream.readLP()).subarray();" +
" return [...bytes].map(n => n.toString(16)).join('');" +
" return [...bytes].map(n => n.toString(16).padStart(2, '0')).join('');" +
"})()")
private static native JSPromise<String> sendRequest(String blockHash, String protocolId);
private static native JSPromise<JSString> sendRequest(String blockHash, String protocolId);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public class WarpSyncService extends NetworkService<WarpSync> {

public WarpSyncService(String protocolId) {
this.protocol = new WarpSync(protocolId, new WarpSyncProtocol());
this.protocol = new WarpSync(protocolId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import com.limechain.utils.scale.ScaleUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.math.BigInteger;
import java.util.Arrays;

@Setter
@Getter
@ToString
public class BlockHeader {
// TODO: Make this const configurable
public static final int BLOCK_NUMBER_SIZE = 4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.limechain.polkaj.Hash256;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;

import java.math.BigInteger;
Expand All @@ -11,6 +12,7 @@
@Setter
@Getter
@Log
@ToString
public class Justification {
private BigInteger round;
private Hash256 targetHash;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

/**
* Each fragment represents a change in the list of Grandpa authorities, and a list of signatures of
* the previous authorities that certify that this change is correct
*/
@Getter
@Setter
@ToString
public class WarpSyncFragment {
private BlockHeader header;
private Justification justification;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.Arrays;

@Getter
@Setter
@ToString
public class WarpSyncResponse {
private WarpSyncFragment[] fragments;
private boolean isFinished;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@ public class BlockHeaderReader implements ScaleReader<BlockHeader> {
@Override
public BlockHeader read(ScaleCodecReader reader) {
BlockHeader blockHeader = new BlockHeader();
System.out.println("BlockHeaderReader.read");
blockHeader.setParentHash(new Hash256(reader.readUint256()));
System.out.println("BlockHeaderReader.setParentHash");
// NOTE: Usage of BlockNumberReader is intentionally omitted here,
// since we want this to be a compact int, not a var size int
blockHeader.setBlockNumber(BigInteger.valueOf(reader.readCompactInt()));
System.out.println("BlockHeaderReader.setBlockNumber");
blockHeader.setStateRoot(new Hash256(reader.readUint256()));
System.out.println("BlockHeaderReader.setStateRoot");
blockHeader.setExtrinsicsRoot(new Hash256(reader.readUint256()));
System.out.println("BlockHeaderReader.setExtrinsicsRoot");

var digestCount = reader.readCompactInt();
HeaderDigest[] digests = new HeaderDigest[digestCount];
Expand All @@ -31,7 +26,6 @@ public BlockHeader read(ScaleCodecReader reader) {
}

blockHeader.setDigest(digests);
System.out.println("BlockHeaderReader.setDigest");

return blockHeader;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/limechain/storage/block/SyncState.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void resetRound() {
public void setLightSyncState(LightSyncState initState) {
this.setId = initState.getGrandpaAuthoritySet().getSetId();
setAuthoritySet(initState.getGrandpaAuthoritySet().getCurrentAuthorities());
// finalizeHeader(initState.getFinalizedBlockHeader());
finalizeHeader(initState.getFinalizedBlockHeader());
}

public String getStateRoot() {
Expand Down
Loading

0 comments on commit b9e826b

Please sign in to comment.