diff --git a/config/config.toml b/config/config.toml index 3a922d0dcf6..ec656eeb877 100644 --- a/config/config.toml +++ b/config/config.toml @@ -4,7 +4,7 @@ # "hobbits": use HobbitsP2PNetwork networkMode = "mock" # Gossip options: floodsub,gossipsub,plumtree,none -gossipProtocol = "plumtree" +gossipProtocol = "floodsub" identity = "0x00" timer="QuartzTimer" networkInterface = "0.0.0.0" diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/AbstractSocketHandler.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/AbstractSocketHandler.java index 1edbb4d8f33..b6262014438 100644 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/AbstractSocketHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/AbstractSocketHandler.java @@ -33,16 +33,16 @@ import org.apache.logging.log4j.Level; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; -import org.apache.tuweni.crypto.Hash; import org.apache.tuweni.hobbits.Message; import org.apache.tuweni.hobbits.Protocol; import org.apache.tuweni.plumtree.State; import tech.pegasys.artemis.datastructures.blocks.BeaconBlock; -import tech.pegasys.artemis.datastructures.blocks.BeaconBlockHeader; import tech.pegasys.artemis.datastructures.operations.Attestation; import tech.pegasys.artemis.networking.p2p.api.P2PNetwork; import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipCodec; import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipMessage; +import tech.pegasys.artemis.networking.p2p.hobbits.rpc.AttestationMessage; +import tech.pegasys.artemis.networking.p2p.hobbits.rpc.BlockBodiesMessage; import tech.pegasys.artemis.networking.p2p.hobbits.rpc.GetStatusMessage; import tech.pegasys.artemis.networking.p2p.hobbits.rpc.HelloMessage; import tech.pegasys.artemis.networking.p2p.hobbits.rpc.RPCCodec; @@ -61,7 +61,7 @@ public abstract class AbstractSocketHandler { protected final Peer peer; protected final ChainStorageClient store; protected final State p2pState; - protected final Set pendingResponses = new HashSet<>(); + protected final Set pendingResponses = new HashSet<>(); protected final AtomicBoolean status = new AtomicBoolean(true); protected final Consumer messageSender; protected final Runnable handlerTermination; @@ -136,33 +136,46 @@ public synchronized void handleMessage(Buffer message) { } protected void handleRPCMessage(RPCMessage rpcMessage) { - if (RPCMethod.GOODBYE.equals(rpcMessage.method())) { + if (RPCMethod.GOODBYE.code() == rpcMessage.method()) { closed(null); - } else if (RPCMethod.HELLO.equals(rpcMessage.method())) { + } else if (RPCMethod.HELLO.code() == rpcMessage.method()) { replyHello(rpcMessage.id()); - } else if (RPCMethod.GET_STATUS.equals(rpcMessage.method())) { + } else if (RPCMethod.GET_STATUS.code() == rpcMessage.method()) { replyStatus(rpcMessage.id()); - } else if (RPCMethod.GET_ATTESTATION.equals(rpcMessage.method())) { + } else if (RPCMethod.GET_ATTESTATION.code() == rpcMessage.method()) { replyAttestation(rpcMessage); - } else if (RPCMethod.GET_BLOCK_BODIES.equals(rpcMessage.method())) { + } else if (RPCMethod.GET_BLOCK_BODIES.code() == rpcMessage.method()) { replyBlockBodies(rpcMessage); - } else if (RPCMethod.ATTESTATION.equals(rpcMessage.method())) { - Attestation attestation = Attestation.fromBytes(rpcMessage.bodyAs(Bytes.class)); - this.eventBus.post(attestation); - } else if (RPCMethod.BLOCK_BODIES.equals(rpcMessage.method())) { - BeaconBlock beaconBlock = BeaconBlock.fromBytes(rpcMessage.bodyAsList().get(0)); - this.eventBus.post(beaconBlock); + } else if (RPCMethod.ATTESTATION.code() == rpcMessage.method()) { + AttestationMessage rb = rpcMessage.bodyAs(AttestationMessage.class); + Attestation attestation = rb.body(); + String key = attestation.toBytes().toHexString(); + if (!receivedMessages.containsKey(key)) { + this.eventBus.post(attestation); + receivedMessages.put(key, true); + } + } else if (RPCMethod.BLOCK_BODIES.code() == rpcMessage.method()) { + BlockBodiesMessage rb = rpcMessage.bodyAs(BlockBodiesMessage.class); + BeaconBlock beaconBlock = rb.bodies().get(0); + String key = beaconBlock.toBytes().toHexString(); + if (!receivedMessages.containsKey(key)) { + this.eventBus.post(beaconBlock); + receivedMessages.put(key, true); + } } } + public abstract void gossipMessage( + int method, String topic, long timestamp, Bytes messageHash, Bytes32 hash, Bytes body); + protected abstract void handleGossipMessage(GossipMessage gossipMessage); - protected void sendReply(RPCMethod method, Object payload, long id) { - sendBytes(RPCCodec.encode(method, payload, id).toBytes()); + protected void sendReply(RPCMethod method, Object payload, BigInteger id) { + sendBytes(RPCCodec.encode(method.code(), payload, id).toBytes()); } protected void sendMessage(RPCMethod method, Object payload) { - sendBytes(RPCCodec.encode(method, payload, pendingResponses).toBytes()); + sendBytes(RPCCodec.encode(method.code(), payload, pendingResponses).toBytes()); } protected void sendBytes(Bytes bytes) { @@ -176,21 +189,7 @@ public void disconnect() { } } - public void gossipMessage( - int method, String topic, long timestamp, Bytes messageHash, Bytes32 hash, Bytes body) { - Bytes bytes = - GossipCodec.encode( - method, - topic, - BigInteger.valueOf(timestamp), - messageHash.toArrayUnsafe(), - hash.toArrayUnsafe(), - body.toArrayUnsafe()) - .toBytes(); - sendBytes(bytes); - } - - public void replyHello(long requestId) { + public void replyHello(BigInteger requestId) { if (!peer.peerHello()) { HelloMessage msg = new HelloMessage( @@ -220,7 +219,7 @@ public void sendHello() { peer.setPeerHello(true); } - public void replyStatus(long requestId) { + public void replyStatus(BigInteger requestId) { sendReply( RPCMethod.GET_STATUS, new GetStatusMessage( @@ -239,10 +238,10 @@ public void sendStatus() { public void replyAttestation(RPCMessage rpcMessage) { RequestAttestationMessage rb = rpcMessage.bodyAs(RequestAttestationMessage.class); - Bytes32 signature = Hash.sha2_256(Bytes32.wrap(rb.hash())); - store - .getUnprocessedAttestation(signature) - .ifPresent(a -> sendReply(RPCMethod.ATTESTATION, a.toBytes(), rpcMessage.id())); + Optional attestation = store.getUnprocessedAttestation(Bytes32.wrap(rb.hash())); + if (attestation.isPresent()) { + sendReply(RPCMethod.ATTESTATION, new AttestationMessage(attestation.get()), rpcMessage.id()); + } } public void sendGetAttestation(Bytes32 attestationHash) { @@ -250,51 +249,20 @@ public void sendGetAttestation(Bytes32 attestationHash) { RPCMethod.GET_ATTESTATION, new RequestAttestationMessage(attestationHash.toArrayUnsafe())); } - public void replyBlockHeaders(RPCMessage rpcMessage) { - RequestBlocksMessage rb = rpcMessage.bodyAs(RequestBlocksMessage.class); - List> blocks = - store.getUnprocessedBlock( - Bytes32.wrap(rb.startRoot()), rb.max().longValue(), rb.skip().longValue()); - List blockHeaders = new ArrayList<>(); - blocks.forEach( - block -> { - if (block.isPresent()) { - blockHeaders.add( - new BeaconBlockHeader( - block.get().getSlot(), - block.get().getParent_root(), - block.get().getState_root(), - block.get().getBody().hash_tree_root(), - block.get().getSignature()) - .toBytes()); - } - }); - if (blockHeaders.size() > 0) { - sendReply(RPCMethod.BLOCK_HEADERS, blockHeaders, rpcMessage.id()); - } - } - - public void sendGetBlockHeaders(Bytes32 root) { - sendMessage( - RPCMethod.GET_BLOCK_HEADERS, - new RequestBlocksMessage( - root.toArrayUnsafe(), BigInteger.ONE, BigInteger.ONE, BigInteger.ZERO, (short) 0)); - } - public void replyBlockBodies(RPCMessage rpcMessage) { RequestBlocksMessage rb = rpcMessage.bodyAs(RequestBlocksMessage.class); List> blocks = store.getUnprocessedBlock( Bytes32.wrap(rb.startRoot()), rb.max().longValue(), rb.skip().longValue()); - List blockBodies = new ArrayList<>(); + List blockBodies = new ArrayList<>(); blocks.forEach( block -> { if (block.isPresent()) { - blockBodies.add(block.get().toBytes()); + blockBodies.add(block.get()); } }); if (blockBodies.size() > 0) { - sendReply(RPCMethod.BLOCK_BODIES, blockBodies, rpcMessage.id()); + sendReply(RPCMethod.BLOCK_BODIES, new BlockBodiesMessage(blockBodies), rpcMessage.id()); } } diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/FloodsubSocketHandler.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/FloodsubSocketHandler.java index f20a53f41e3..a349a2caec3 100644 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/FloodsubSocketHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/FloodsubSocketHandler.java @@ -16,15 +16,16 @@ import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import io.vertx.core.net.NetSocket; +import java.math.BigInteger; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; import org.apache.logging.log4j.Level; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; -import org.apache.tuweni.plumtree.MessageSender; import org.apache.tuweni.plumtree.State; import tech.pegasys.artemis.datastructures.blocks.BeaconBlock; import tech.pegasys.artemis.datastructures.operations.Attestation; +import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipCodec; import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipMessage; import tech.pegasys.artemis.storage.ChainStorageClient; import tech.pegasys.artemis.util.alogger.ALogger; @@ -44,20 +45,36 @@ public FloodsubSocketHandler( super(eventBus, netSocket, userAgent, peer, store, p2pState, receivedMessages); } + @Override + public void gossipMessage( + int method, String topic, long timestamp, Bytes messageHash, Bytes32 hash, Bytes body) { + // TODO: Hack to make compatible with HOBBITS FloodSub spec + method = 0; + hash = Bytes32.wrap(body); + Bytes bytes = + GossipCodec.encode( + method, + topic, + BigInteger.valueOf(timestamp), + messageHash.toArrayUnsafe(), + hash.toArrayUnsafe(), + Bytes.EMPTY.toArrayUnsafe()) + .toBytes(); + sendBytes(bytes); + } + @Override @SuppressWarnings("StringSplitter") protected void handleGossipMessage(GossipMessage gossipMessage) { - if (MessageSender.Verb.GOSSIP.ordinal() == gossipMessage.method()) { - Bytes body = Bytes.wrap(gossipMessage.body()); - String key = body.toHexString(); + if (gossipMessage.method() == 0) { + Bytes32 hash = Bytes32.wrap(gossipMessage.hash()); + String key = hash.toHexString(); if (!receivedMessages.containsKey(key)) { - peer.setPeerGossip(body); + peer.setPeerGossip(hash); if (gossipMessage.getTopic().equalsIgnoreCase("ATTESTATION")) { - Bytes32 attestationHash = Bytes32.wrap(gossipMessage.body()); - this.sendGetAttestation(attestationHash); + this.sendGetAttestation(hash); } else if (gossipMessage.getTopic().equalsIgnoreCase("BLOCK")) { - Bytes32 blockRoot = Bytes32.wrap(gossipMessage.body()); - this.sendGetBlockBodies(blockRoot); + this.sendGetBlockBodies(hash); } receivedMessages.put(key, true); } diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/PlumtreeSocketHandler.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/PlumtreeSocketHandler.java index 049fa907b64..9f25472c111 100644 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/PlumtreeSocketHandler.java +++ b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/PlumtreeSocketHandler.java @@ -16,14 +16,17 @@ import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import io.vertx.core.net.NetSocket; +import java.math.BigInteger; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; import org.apache.logging.log4j.Level; import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.plumtree.MessageSender; import org.apache.tuweni.plumtree.State; import tech.pegasys.artemis.datastructures.blocks.BeaconBlock; import tech.pegasys.artemis.datastructures.operations.Attestation; +import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipCodec; import tech.pegasys.artemis.networking.p2p.hobbits.gossip.GossipMessage; import tech.pegasys.artemis.storage.ChainStorageClient; import tech.pegasys.artemis.util.alogger.ALogger; @@ -43,6 +46,21 @@ public PlumtreeSocketHandler( super(eventBus, netSocket, userAgent, peer, store, p2pState, receivedMessages); } + @Override + public void gossipMessage( + int method, String topic, long timestamp, Bytes messageHash, Bytes32 hash, Bytes body) { + Bytes bytes = + GossipCodec.encode( + method, + topic, + BigInteger.valueOf(timestamp), + messageHash.toArrayUnsafe(), + hash.toArrayUnsafe(), + body.toArrayUnsafe()) + .toBytes(); + sendBytes(bytes); + } + @Override protected void handleGossipMessage(GossipMessage gossipMessage) { if (MessageSender.Verb.GOSSIP.ordinal() == gossipMessage.method()) { diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/AttestationMessage.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/AttestationMessage.java index 899cdfe903b..c3c8a9c341a 100644 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/AttestationMessage.java +++ b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/AttestationMessage.java @@ -13,24 +13,42 @@ package tech.pegasys.artemis.networking.p2p.hobbits.rpc; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import org.apache.tuweni.bytes.Bytes; -import tech.pegasys.artemis.datastructures.blocks.BeaconBlock; +import tech.pegasys.artemis.datastructures.operations.Attestation; +@JsonSerialize(using = AttestationMessage.AttestationSerializer.class) @JsonDeserialize(using = AttestationMessage.AttestationDeserializer.class) public final class AttestationMessage { + static class AttestationSerializer extends StdSerializer { + + protected AttestationSerializer() { + super(AttestationMessage.class); + } + + @Override + public void serialize( + AttestationMessage attestationMessage, JsonGenerator jgen, SerializerProvider provider) + throws IOException { + jgen.writeStartObject(); + jgen.writeBinaryField("attestation", attestationMessage.body().toBytes().toArrayUnsafe()); + jgen.writeEndObject(); + } + } + static class AttestationDeserializer extends StdDeserializer { protected AttestationDeserializer() { @@ -41,38 +59,37 @@ protected AttestationDeserializer() { public AttestationMessage deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException, JsonProcessingException { JsonNode node = jp.getCodec().readTree(jp); - Iterator iterator = node.iterator(); - List elts = new ArrayList<>(); - while (iterator.hasNext()) { - JsonNode child = iterator.next(); - elts.add(BeaconBlock.fromBytes(Bytes.wrap(child.get("bytes").binaryValue()))); - } + AttestationBody elts = new AttestationBody(Bytes.wrap(node.get("attestation").binaryValue())); return new AttestationMessage(elts); } } - static class Attestation { + static class AttestationBody { private final Bytes bytes; - Attestation(Bytes bytes) { + AttestationBody(Bytes bytes) { this.bytes = bytes; } - @JsonProperty("bytes") public Bytes bytes() { return bytes; } } - private final List bodies; + private final Attestation body; + + AttestationMessage(AttestationBody body) { + this.body = Attestation.fromBytes(body.bytes()); + } - AttestationMessage(List bodies) { - this.bodies = bodies; + @JsonCreator + public AttestationMessage(@JsonProperty("attestation") Attestation attestation) { + this.body = attestation; } - @JsonValue - public List bodies() { - return bodies; + @JsonProperty("attestation") + public Attestation body() { + return body; } } diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/BlockBodiesMessage.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/BlockBodiesMessage.java index d380ce484f2..ab49d021889 100644 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/BlockBodiesMessage.java +++ b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/BlockBodiesMessage.java @@ -13,13 +13,17 @@ package tech.pegasys.artemis.networking.p2p.hobbits.rpc; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonValue; +import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -27,9 +31,41 @@ import org.apache.tuweni.bytes.Bytes; import tech.pegasys.artemis.datastructures.blocks.BeaconBlock; +@JsonSerialize(using = BlockBodiesMessage.BlockBodiesSerializer.class) @JsonDeserialize(using = BlockBodiesMessage.BlockBodiesDeserializer.class) public final class BlockBodiesMessage { + static class BlockBodiesSerializer extends StdSerializer { + + protected BlockBodiesSerializer() { + super(BlockBodiesMessage.class); + } + + @Override + public void serialize( + BlockBodiesMessage blockBodiesMessage, JsonGenerator jgen, SerializerProvider provider) + throws IllegalArgumentException { + try { + jgen.writeStartObject(); + jgen.writeArrayFieldStart("bodies"); + blockBodiesMessage + .bodies() + .forEach( + item -> { + try { + jgen.writeBinary(item.toBytes().toArrayUnsafe()); + } catch (java.io.IOException e) { + throw new IllegalArgumentException(e.getMessage()); + } + }); + jgen.writeEndArray(); + jgen.writeEndObject(); + } catch (java.io.IOException e) { + throw new IllegalArgumentException(e.getMessage()); + } + } + } + static class BlockBodiesDeserializer extends StdDeserializer { protected BlockBodiesDeserializer() { @@ -38,15 +74,18 @@ protected BlockBodiesDeserializer() { @Override public BlockBodiesMessage deserialize(JsonParser jp, DeserializationContext ctxt) - throws IOException { - JsonNode node = jp.getCodec().readTree(jp); - Iterator iterator = node.iterator(); - List elts = new ArrayList<>(); - while (iterator.hasNext()) { - JsonNode child = iterator.next(); - elts.add(BeaconBlock.fromBytes(Bytes.wrap(child.get("bytes").binaryValue()))); + throws IllegalArgumentException { + List elts = new ArrayList<>(); + try { + JsonNode node = jp.getCodec().readTree(jp); + Iterator iterator = node.withArray("bodies").iterator(); + while (iterator.hasNext()) { + elts.add(new BlockBody(Bytes.wrap(iterator.next().binaryValue()))); + } + } catch (IOException e) { + throw new IllegalArgumentException(e.getMessage()); } - return new BlockBodiesMessage(elts); + return new BlockBodiesMessage(elts, true); } } @@ -58,19 +97,23 @@ static class BlockBody { this.bytes = bytes; } - @JsonProperty("bytes") public Bytes bytes() { return bytes; } } - private final List bodies; + private List bodies = new ArrayList<>(); + + BlockBodiesMessage(List blockBodies, boolean flag) { + blockBodies.forEach(a -> this.bodies.add(BeaconBlock.fromBytes(a.bytes()))); + } - BlockBodiesMessage(List bodies) { - this.bodies = bodies; + @JsonCreator + public BlockBodiesMessage(@JsonProperty("bodies") List beaconBlocks) { + this.bodies = beaconBlocks; } - @JsonValue + @JsonProperty("bodies") public List bodies() { return bodies; } diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/BlockHeadersMessage.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/BlockHeadersMessage.java deleted file mode 100644 index b0426e01ea7..00000000000 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/BlockHeadersMessage.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2019 ConsenSys AG. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package tech.pegasys.artemis.networking.p2p.hobbits.rpc; - -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonValue; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import org.apache.tuweni.bytes.Bytes; -import tech.pegasys.artemis.datastructures.blocks.BeaconBlockHeader; - -@JsonDeserialize(using = BlockHeadersMessage.BlockHeadersDeserializer.class) -public final class BlockHeadersMessage { - - static class BlockHeadersDeserializer extends StdDeserializer { - - protected BlockHeadersDeserializer() { - super(BlockHeadersMessage.class); - } - - @Override - public BlockHeadersMessage deserialize(JsonParser jp, DeserializationContext ctxt) - throws IOException, JsonProcessingException { - JsonNode node = jp.getCodec().readTree(jp); - Iterator iterator = node.iterator(); - List elts = new ArrayList<>(); - while (iterator.hasNext()) { - JsonNode child = iterator.next(); - elts.add(BeaconBlockHeader.fromBytes(Bytes.wrap(child.get("bytes").binaryValue()))); - } - return new BlockHeadersMessage(elts); - } - } - - static class BlockHeader { - - private final Bytes bytes; - - BlockHeader(Bytes bytes) { - this.bytes = bytes; - } - - @JsonProperty("bytes") - public Bytes bytes() { - return bytes; - } - } - - private final List headers; - - BlockHeadersMessage(List headers) { - this.headers = headers; - } - - @JsonValue - public List headers() { - return headers; - } -} diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCCodec.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCCodec.java index 73af56d973c..76269a5d0ec 100644 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCCodec.java +++ b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCCodec.java @@ -18,10 +18,10 @@ import de.undercouch.bson4jackson.BsonFactory; import java.io.IOException; import java.io.UncheckedIOException; +import java.math.BigInteger; import java.util.Collections; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.Nullable; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.hobbits.Message; import org.apache.tuweni.hobbits.Protocol; @@ -34,13 +34,13 @@ public final class RPCCodec { private static final AtomicLong counter = new AtomicLong(1); - private static long nextRequestNumber() { + private static BigInteger nextRequestNumber() { long requestNumber = counter.getAndIncrement(); if (requestNumber < 1) { counter.set(1); - return 1; + return BigInteger.ONE; } - return requestNumber; + return BigInteger.valueOf(requestNumber); } private RPCCodec() {} @@ -60,7 +60,7 @@ public static ObjectMapper getMapper() { * @return the encoded bytes of a goodbye message. */ public static Message createGoodbye() { - return encode(RPCMethod.GOODBYE, Collections.emptyMap(), null); + return encode(RPCMethod.GOODBYE.code(), Collections.emptyMap(), Collections.emptySet()); } /** @@ -71,10 +71,9 @@ public static Message createGoodbye() { * @param pendingResponses the set of pending responses code to update * @return the encoded RPC message */ - public static Message encode( - RPCMethod methodId, Object request, @Nullable Set pendingResponses) { - long requestNumber = nextRequestNumber(); - if (pendingResponses != null) { + public static Message encode(int methodId, Object request, Set pendingResponses) { + BigInteger requestNumber = nextRequestNumber(); + if (!pendingResponses.isEmpty()) { pendingResponses.add(requestNumber); } return encode(methodId, request, requestNumber); @@ -88,10 +87,10 @@ public static Message encode( * @param requestNumber a request number * @return the encoded RPC message */ - public static Message encode(RPCMethod methodId, Object request, long requestNumber) { + public static Message encode(int methodId, Object request, BigInteger requestNumber) { ObjectNode headerNode = mapper.createObjectNode(); - headerNode.put("method_id", methodId.code()); + headerNode.put("method_id", methodId); headerNode.put("id", requestNumber); ObjectNode bodyNode = mapper.createObjectNode(); bodyNode.putPOJO("body", request); @@ -102,7 +101,7 @@ public static Message encode(RPCMethod methodId, Object request, long requestNum Message message = new Message(3, Protocol.RPC, header, body); return message; } catch (IOException e) { - throw new IllegalArgumentException(e); + throw new IllegalArgumentException(e.getMessage()); } } @@ -117,10 +116,10 @@ public static RPCMessage decode(Message message) { byte[] header = message.getHeaders().toArrayUnsafe(); byte[] body = message.getBody().toArrayUnsafe(); ObjectNode headerNode = (ObjectNode) mapper.readTree(header); - long id = headerNode.get("id").longValue(); int methodId = headerNode.get("method_id").intValue(); + BigInteger id = headerNode.get("id").bigIntegerValue(); ObjectNode bodyNode = (ObjectNode) mapper.readTree(body); - return new RPCMessage(id, RPCMethod.valueOf(methodId), bodyNode.get("body"), message.size()); + return new RPCMessage(methodId, id, bodyNode.get("body"), message.size()); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCMessage.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCMessage.java index d4339133845..190ea2b9240 100644 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCMessage.java +++ b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCMessage.java @@ -16,35 +16,37 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; import java.io.UncheckedIOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import org.apache.tuweni.bytes.Bytes; +import tech.pegasys.artemis.util.alogger.ALogger; /** Representation of a RPC message that was received from a remote peer. */ public final class RPCMessage { - - private final long id; - private final RPCMethod method; + private static final ALogger STDOUT = new ALogger("stdout"); + private final int method; + private final BigInteger id; private final JsonNode body; private final int length; - public RPCMessage(long id, RPCMethod method, JsonNode body, int length) { - this.id = id; + public RPCMessage(int method, BigInteger id, JsonNode body, int length) { this.method = method; + this.id = id; this.body = body; this.length = length; } - /** @return the request identifier */ - public long id() { - return id; - } - /** @return the method used by the RPC call. */ - public RPCMethod method() { + public int method() { return method; } + /** @return the request identifier */ + public BigInteger id() { + return id; + } + /** * Reads the body of the message into a * @@ -61,12 +63,53 @@ public T bodyAs(Class T) { } } - public List bodyAsList() { + /** + * Reads the body of the message into a List + * + * @param T the type of the body to unmarshall + * @param the type of the body to unmarshall + * @return the body, unmarshalled. + * @throws UncheckedIOException if the body cannot be successfully unmarshalled + */ + @SuppressWarnings("unchecked") + public List bodyAsList(Class T) { + try { + return (List) RPCCodec.mapper.treeToValue(body, T); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Reads the body of the message into Bytes + * + * @return the body, unmarshalled. + * @throws UncheckedIOException if the body cannot be successfully unmarshalled + */ + public Bytes bodyAsBytes() { + try { + return Bytes.wrap(body.binaryValue()); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Reads the body of the message into List + * + * @return the body, unmarshalled. + * @throws UncheckedIOException if the body cannot be successfully unmarshalled + */ + public List bodyAsBytesList() { List newList = new ArrayList<>(); - if (body.isArray()) { - for (final JsonNode objNode : body) { - newList.add(Bytes.fromHexString(objNode.textValue())); + try { + if (body.isArray()) { + for (final JsonNode objNode : body) { + newList.add(Bytes.wrap(objNode.binaryValue())); + } } + } catch (IOException e) { + throw new UncheckedIOException(e); } return newList; } diff --git a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCMethod.java b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCMethod.java index ed1501863ad..c5b8e5579d8 100644 --- a/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCMethod.java +++ b/networking/p2p/src/main/java/tech/pegasys/artemis/networking/p2p/hobbits/rpc/RPCMethod.java @@ -32,7 +32,7 @@ public enum RPCMethod { } /** @return the encoded code of the RPC method */ - int code() { + public int code() { return code; } diff --git a/networking/p2p/src/test/java/tech/pegasys/artemis/networking/p2p/hobbits/RPCCodecTest.java b/networking/p2p/src/test/java/tech/pegasys/artemis/networking/p2p/hobbits/RPCCodecTest.java index f6f0000b68a..499affbc471 100644 --- a/networking/p2p/src/test/java/tech/pegasys/artemis/networking/p2p/hobbits/RPCCodecTest.java +++ b/networking/p2p/src/test/java/tech/pegasys/artemis/networking/p2p/hobbits/RPCCodecTest.java @@ -34,7 +34,7 @@ final class RPCCodecTest { void testGoodbye() { Message goodbye = RPCCodec.createGoodbye(); RPCMessage message = RPCCodec.decode(goodbye); - assertEquals(RPCMethod.GOODBYE, message.method()); + assertEquals(RPCMethod.GOODBYE.code(), message.method()); Map map = message.bodyAs(Map.class); assertTrue(map.isEmpty()); } @@ -49,9 +49,9 @@ void testHello() { BigInteger.ZERO, Bytes32.random().toArrayUnsafe(), BigInteger.ZERO); - Message encoded = RPCCodec.encode(RPCMethod.HELLO, hello, 23); + Message encoded = RPCCodec.encode(RPCMethod.HELLO.code(), hello, BigInteger.TEN); RPCMessage message = RPCCodec.decode(encoded); - assertEquals(RPCMethod.HELLO, message.method()); + assertEquals(RPCMethod.HELLO.code(), message.method()); HelloMessage read = message.bodyAs(HelloMessage.class); assertTrue(Arrays.equals(hello.bestRoot(), read.bestRoot())); }