Skip to content

Commit

Permalink
Merge pull request #1017 from rsksmart/eth_subscribe_logs_parameters
Browse files Browse the repository at this point in the history
Add filter support to eth_subscribe log method
  • Loading branch information
diega authored Oct 17, 2019
2 parents f442bf2 + 4736840 commit 5ad85ca
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,41 @@

package co.rsk.rpc.modules.eth.subscribe;

import co.rsk.core.RskAddress;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import io.netty.channel.Channel;
import org.ethereum.rpc.Topic;

import java.util.Arrays;

@JsonDeserialize
public class EthSubscribeLogsParams implements EthSubscribeParams {

private final String address;
private final String[] topics;
private final RskAddress[] addresses;
private final Topic[][] topics;

public EthSubscribeLogsParams() {
this(null, new String[0]);
this(new RskAddress[0], new Topic[0][]);
}

@JsonCreator
public EthSubscribeLogsParams(
@JsonProperty("address") String address,
@JsonProperty("topics") String[] topics
@JsonProperty("address") @JsonFormat(with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY) RskAddress[] addresses,
@JsonProperty("topics") @JsonFormat(with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY) Topic[][] topics
) {
this.address = address;
this.topics = topics == null? new String[0]: topics;
this.addresses = addresses == null? new RskAddress[0]: addresses;
this.topics = topics == null? new Topic[0][]: topics;
}

public String getAddress() {
return address;
public RskAddress[] getAddresses() {
return Arrays.copyOf(addresses, addresses.length);
}

public String[] getTopics() {
public Topic[][] getTopics() {
// TODO(mc) deep copy
return Arrays.copyOf(topics, topics.length);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package co.rsk.rpc.modules.eth.subscribe;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import org.ethereum.core.Block;
import org.ethereum.core.Transaction;
Expand All @@ -34,64 +35,97 @@
* The logs DTO for JSON serialization purposes.
*/
public class LogsNotification implements EthSubscriptionNotificationDTO {
private final String logIndex;
private final String blockNumber;
private final String blockHash;
private final String transactionHash;
private final String transactionIndex;
private final String address;
private final String data;
private final List<String> topics;

private final LogInfo logInfo;
private final Block block;
private final Transaction transaction;
private final int logInfoIndex;
private final int transactionIndex;
private final boolean removed;

private String lazyLogIndex;
private String lazyBlockNumber;
private String lazyBlockHash;
private String lazyTransactionHash;
private String lazyTransactionIndex;
private String lazyAddress;
private String lazyData;
private List<String> lazyTopics;

public LogsNotification(LogInfo logInfo, Block b, int txIndex, Transaction tx, int logIdx, boolean r) {
logIndex = toQuantityJsonHex(logIdx);
blockNumber = toQuantityJsonHex(b.getNumber());
blockHash = b.getHashJsonString();
removed = r;
transactionIndex = toQuantityJsonHex(txIndex);
transactionHash = tx.getHash().toJsonString();
address = toJsonHex(logInfo.getAddress());
data = toJsonHex(logInfo.getData());
topics = logInfo.getTopics().stream()
.map(t -> toJsonHex(t.getData()))
.collect(Collectors.toList());
this.logInfo = logInfo;
this.block = b;
this.transaction = tx;
this.logInfoIndex = logIdx;
this.removed = r;
this.transactionIndex = txIndex;
}

public String getLogIndex() {
return logIndex;
if (lazyLogIndex == null) {
lazyLogIndex = toQuantityJsonHex(logInfoIndex);
}
return lazyLogIndex;
}

public String getBlockNumber() {
return blockNumber;
if (lazyBlockNumber == null) {
lazyBlockNumber = toQuantityJsonHex(block.getNumber());
}
return lazyBlockNumber;
}

public String getBlockHash() {
return blockHash;
if (lazyBlockHash == null) {
lazyBlockHash = block.getHashJsonString();
}
return lazyBlockHash;
}

public String getTransactionHash() {
return transactionHash;
if (lazyTransactionHash == null) {
lazyTransactionHash = transaction.getHash().toJsonString();
}
return lazyTransactionHash;
}

public String getTransactionIndex() {
return transactionIndex;
if (lazyTransactionIndex == null) {
lazyTransactionIndex = toQuantityJsonHex(transactionIndex);
}
return lazyTransactionIndex;
}

public String getAddress() {
return address;
if (lazyAddress == null) {
lazyAddress = toJsonHex(logInfo.getAddress());
}
return lazyAddress;
}

public String getData() {
return data;
if (lazyData == null) {
lazyData = toJsonHex(logInfo.getData());
}
return lazyData;
}

public List<String> getTopics() {
return Collections.unmodifiableList(topics);
if (lazyTopics == null) {
lazyTopics = logInfo.getTopics().stream()
.map(t -> toJsonHex(t.getData()))
.collect(Collectors.toList());
}
return Collections.unmodifiableList(lazyTopics);
}

@JsonInclude(JsonInclude.Include.NON_EMPTY)
public boolean getRemoved() {
return removed;
}

@JsonIgnore
public LogInfo getLogInfo() {
return logInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.ethereum.db.TransactionInfo;
import org.ethereum.facade.Ethereum;
import org.ethereum.listener.EthereumListenerAdapter;
import org.ethereum.rpc.AddressesTopicsFilter;
import org.ethereum.vm.LogInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -47,7 +48,7 @@ public class LogsNotificationEmitter {
private final ReceiptStore receiptStore;
private final BlockchainBranchComparator branchComparator;

private final Map<SubscriptionId, Channel> subscriptions = new ConcurrentHashMap<>();
private final Map<SubscriptionId, Subscription> subscriptions = new ConcurrentHashMap<>();
private Block lastEmitted;

public LogsNotificationEmitter(
Expand All @@ -67,15 +68,15 @@ public void onBestBlock(Block block, List<TransactionReceipt> receipts) {
}

public void subscribe(SubscriptionId subscriptionId, Channel channel, EthSubscribeLogsParams params) {
subscriptions.put(subscriptionId, channel);
subscriptions.put(subscriptionId, new Subscription(channel, params));
}

public boolean unsubscribe(SubscriptionId subscriptionId) {
return subscriptions.remove(subscriptionId) != null;
}

public void unsubscribe(Channel channel) {
subscriptions.values().removeIf(channel::equals);
subscriptions.values().removeIf(s -> channel.equals(s.channel));
}

private void emitLogs(Block block) {
Expand All @@ -100,20 +101,23 @@ private void emitLogs(Block block) {
}

private void emitLogs(List<LogsNotification> notifications) {
for (Map.Entry<SubscriptionId, Channel> entry : subscriptions.entrySet()) {
for (Map.Entry<SubscriptionId, Subscription> entry : subscriptions.entrySet()) {
SubscriptionId id = entry.getKey();
Channel channel = entry.getValue();
Channel channel = entry.getValue().channel;
AddressesTopicsFilter filter = entry.getValue().filter;

for (LogsNotification notification : notifications) {
EthSubscriptionNotification request = new EthSubscriptionNotification(
new EthSubscriptionParams(id, notification)
);

try {
String msg = jsonRpcSerializer.serializeMessage(request);
channel.write(new TextWebSocketFrame(msg));
} catch (IOException e) {
logger.error("Couldn't serialize block header result for notification", e);
if (filter.matchesExactly(notification.getLogInfo())) {
EthSubscriptionNotification request = new EthSubscriptionNotification(
new EthSubscriptionParams(id, notification)
);

try {
String msg = jsonRpcSerializer.serializeMessage(request);
channel.write(new TextWebSocketFrame(msg));
} catch (IOException e) {
logger.error("Couldn't serialize block header result for notification", e);
}
}
}

Expand Down Expand Up @@ -144,4 +148,14 @@ private List<LogsNotification> getLogsNotifications(Block block, boolean removed

return notifications;
}

private static class Subscription {
private final Channel channel;
private final AddressesTopicsFilter filter;

private Subscription(Channel channel, EthSubscribeLogsParams params) {
this.channel = channel;
this.filter = new AddressesTopicsFilter(params.getAddresses(), params.getTopics());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class AddressesTopicsFilter {
private List<Topic[]> topics = new ArrayList<>(); // [[addr1, addr2], null, [A, B], [C]]
private RskAddress[] addresses = new RskAddress[0];
private RskAddress[] addresses;
private Bloom[][] filterBlooms;

public AddressesTopicsFilter(RskAddress[] addresses, Topic[][] topics) {
Expand All @@ -40,7 +40,7 @@ public AddressesTopicsFilter(RskAddress[] addresses, Topic[][] topics) {
}
}

this.addresses = addresses;
this.addresses = addresses == null? new RskAddress[0]: addresses;

initBlooms();
}
Expand Down
4 changes: 4 additions & 0 deletions rskj-core/src/main/java/org/ethereum/rpc/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ public int hashCode() {
public String toString() {
return Hex.toHexString(bytes);
}

public String toJsonString() {
return TypeConverter.toUnformattedJsonHex(this.getBytes());
}
}
Loading

0 comments on commit 5ad85ca

Please sign in to comment.