diff --git a/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/EthSubscribeLogsParams.java b/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/EthSubscribeLogsParams.java index 295c7b20f04..6d313a4d396 100644 --- a/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/EthSubscribeLogsParams.java +++ b/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/EthSubscribeLogsParams.java @@ -18,37 +18,41 @@ package co.rsk.rpc.modules.eth.subscribe; +import co.rsk.core.RskAddress; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import io.netty.channel.Channel; +import org.ethereum.rpc.Topic; import java.util.Arrays; @JsonDeserialize public class EthSubscribeLogsParams implements EthSubscribeParams { - private final String address; - private final String[] topics; + private final RskAddress[] addresses; + private final Topic[][] topics; public EthSubscribeLogsParams() { - this(null, new String[0]); + this(new RskAddress[0], new Topic[0][]); } @JsonCreator public EthSubscribeLogsParams( - @JsonProperty("address") String address, - @JsonProperty("topics") String[] topics + @JsonProperty("address") @JsonFormat(with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY) RskAddress[] addresses, + @JsonProperty("topics") @JsonFormat(with = JsonFormat.Feature.ACCEPT_SINGLE_VALUE_AS_ARRAY) Topic[][] topics ) { - this.address = address; - this.topics = topics == null? new String[0]: topics; + this.addresses = addresses == null? new RskAddress[0]: addresses; + this.topics = topics == null? new Topic[0][]: topics; } - public String getAddress() { - return address; + public RskAddress[] getAddresses() { + return Arrays.copyOf(addresses, addresses.length); } - public String[] getTopics() { + public Topic[][] getTopics() { + // TODO(mc) deep copy return Arrays.copyOf(topics, topics.length); } diff --git a/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/LogsNotification.java b/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/LogsNotification.java index 87381f521e1..66fbaec8f65 100644 --- a/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/LogsNotification.java +++ b/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/LogsNotification.java @@ -18,6 +18,7 @@ package co.rsk.rpc.modules.eth.subscribe; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import org.ethereum.core.Block; import org.ethereum.core.Transaction; @@ -34,64 +35,97 @@ * The logs DTO for JSON serialization purposes. */ public class LogsNotification implements EthSubscriptionNotificationDTO { - private final String logIndex; - private final String blockNumber; - private final String blockHash; - private final String transactionHash; - private final String transactionIndex; - private final String address; - private final String data; - private final List topics; + + private final LogInfo logInfo; + private final Block block; + private final Transaction transaction; + private final int logInfoIndex; + private final int transactionIndex; private final boolean removed; + private String lazyLogIndex; + private String lazyBlockNumber; + private String lazyBlockHash; + private String lazyTransactionHash; + private String lazyTransactionIndex; + private String lazyAddress; + private String lazyData; + private List lazyTopics; + public LogsNotification(LogInfo logInfo, Block b, int txIndex, Transaction tx, int logIdx, boolean r) { - logIndex = toQuantityJsonHex(logIdx); - blockNumber = toQuantityJsonHex(b.getNumber()); - blockHash = b.getHashJsonString(); - removed = r; - transactionIndex = toQuantityJsonHex(txIndex); - transactionHash = tx.getHash().toJsonString(); - address = toJsonHex(logInfo.getAddress()); - data = toJsonHex(logInfo.getData()); - topics = logInfo.getTopics().stream() - .map(t -> toJsonHex(t.getData())) - .collect(Collectors.toList()); + this.logInfo = logInfo; + this.block = b; + this.transaction = tx; + this.logInfoIndex = logIdx; + this.removed = r; + this.transactionIndex = txIndex; } public String getLogIndex() { - return logIndex; + if (lazyLogIndex == null) { + lazyLogIndex = toQuantityJsonHex(logInfoIndex); + } + return lazyLogIndex; } public String getBlockNumber() { - return blockNumber; + if (lazyBlockNumber == null) { + lazyBlockNumber = toQuantityJsonHex(block.getNumber()); + } + return lazyBlockNumber; } public String getBlockHash() { - return blockHash; + if (lazyBlockHash == null) { + lazyBlockHash = block.getHashJsonString(); + } + return lazyBlockHash; } public String getTransactionHash() { - return transactionHash; + if (lazyTransactionHash == null) { + lazyTransactionHash = transaction.getHash().toJsonString(); + } + return lazyTransactionHash; } public String getTransactionIndex() { - return transactionIndex; + if (lazyTransactionIndex == null) { + lazyTransactionIndex = toQuantityJsonHex(transactionIndex); + } + return lazyTransactionIndex; } public String getAddress() { - return address; + if (lazyAddress == null) { + lazyAddress = toJsonHex(logInfo.getAddress()); + } + return lazyAddress; } public String getData() { - return data; + if (lazyData == null) { + lazyData = toJsonHex(logInfo.getData()); + } + return lazyData; } public List getTopics() { - return Collections.unmodifiableList(topics); + if (lazyTopics == null) { + lazyTopics = logInfo.getTopics().stream() + .map(t -> toJsonHex(t.getData())) + .collect(Collectors.toList()); + } + return Collections.unmodifiableList(lazyTopics); } @JsonInclude(JsonInclude.Include.NON_EMPTY) public boolean getRemoved() { return removed; } + + @JsonIgnore + public LogInfo getLogInfo() { + return logInfo; + } } diff --git a/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationEmitter.java b/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationEmitter.java index 3a323926299..8d20d49190e 100644 --- a/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationEmitter.java +++ b/rskj-core/src/main/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationEmitter.java @@ -29,6 +29,7 @@ import org.ethereum.db.TransactionInfo; import org.ethereum.facade.Ethereum; import org.ethereum.listener.EthereumListenerAdapter; +import org.ethereum.rpc.AddressesTopicsFilter; import org.ethereum.vm.LogInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ public class LogsNotificationEmitter { private final ReceiptStore receiptStore; private final BlockchainBranchComparator branchComparator; - private final Map subscriptions = new ConcurrentHashMap<>(); + private final Map subscriptions = new ConcurrentHashMap<>(); private Block lastEmitted; public LogsNotificationEmitter( @@ -67,7 +68,7 @@ public void onBestBlock(Block block, List receipts) { } public void subscribe(SubscriptionId subscriptionId, Channel channel, EthSubscribeLogsParams params) { - subscriptions.put(subscriptionId, channel); + subscriptions.put(subscriptionId, new Subscription(channel, params)); } public boolean unsubscribe(SubscriptionId subscriptionId) { @@ -75,7 +76,7 @@ public boolean unsubscribe(SubscriptionId subscriptionId) { } public void unsubscribe(Channel channel) { - subscriptions.values().removeIf(channel::equals); + subscriptions.values().removeIf(s -> channel.equals(s.channel)); } private void emitLogs(Block block) { @@ -100,20 +101,23 @@ private void emitLogs(Block block) { } private void emitLogs(List notifications) { - for (Map.Entry entry : subscriptions.entrySet()) { + for (Map.Entry entry : subscriptions.entrySet()) { SubscriptionId id = entry.getKey(); - Channel channel = entry.getValue(); + Channel channel = entry.getValue().channel; + AddressesTopicsFilter filter = entry.getValue().filter; for (LogsNotification notification : notifications) { - EthSubscriptionNotification request = new EthSubscriptionNotification( - new EthSubscriptionParams(id, notification) - ); - - try { - String msg = jsonRpcSerializer.serializeMessage(request); - channel.write(new TextWebSocketFrame(msg)); - } catch (IOException e) { - logger.error("Couldn't serialize block header result for notification", e); + if (filter.matchesExactly(notification.getLogInfo())) { + EthSubscriptionNotification request = new EthSubscriptionNotification( + new EthSubscriptionParams(id, notification) + ); + + try { + String msg = jsonRpcSerializer.serializeMessage(request); + channel.write(new TextWebSocketFrame(msg)); + } catch (IOException e) { + logger.error("Couldn't serialize block header result for notification", e); + } } } @@ -144,4 +148,14 @@ private List getLogsNotifications(Block block, boolean removed return notifications; } + + private static class Subscription { + private final Channel channel; + private final AddressesTopicsFilter filter; + + private Subscription(Channel channel, EthSubscribeLogsParams params) { + this.channel = channel; + this.filter = new AddressesTopicsFilter(params.getAddresses(), params.getTopics()); + } + } } diff --git a/rskj-core/src/main/java/org/ethereum/rpc/AddressesTopicsFilter.java b/rskj-core/src/main/java/org/ethereum/rpc/AddressesTopicsFilter.java index 050d16e1938..e91d0cce836 100644 --- a/rskj-core/src/main/java/org/ethereum/rpc/AddressesTopicsFilter.java +++ b/rskj-core/src/main/java/org/ethereum/rpc/AddressesTopicsFilter.java @@ -30,7 +30,7 @@ public class AddressesTopicsFilter { private List topics = new ArrayList<>(); // [[addr1, addr2], null, [A, B], [C]] - private RskAddress[] addresses = new RskAddress[0]; + private RskAddress[] addresses; private Bloom[][] filterBlooms; public AddressesTopicsFilter(RskAddress[] addresses, Topic[][] topics) { @@ -40,7 +40,7 @@ public AddressesTopicsFilter(RskAddress[] addresses, Topic[][] topics) { } } - this.addresses = addresses; + this.addresses = addresses == null? new RskAddress[0]: addresses; initBlooms(); } diff --git a/rskj-core/src/main/java/org/ethereum/rpc/Topic.java b/rskj-core/src/main/java/org/ethereum/rpc/Topic.java index 4d9334e7801..a4a6bb28e59 100644 --- a/rskj-core/src/main/java/org/ethereum/rpc/Topic.java +++ b/rskj-core/src/main/java/org/ethereum/rpc/Topic.java @@ -79,4 +79,8 @@ public int hashCode() { public String toString() { return Hex.toHexString(bytes); } + + public String toJsonString() { + return TypeConverter.toUnformattedJsonHex(this.getBytes()); + } } diff --git a/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/EthSubscribeRequestTest.java b/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/EthSubscribeRequestTest.java index c6bd432e97f..bf3bb08aff0 100644 --- a/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/EthSubscribeRequestTest.java +++ b/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/EthSubscribeRequestTest.java @@ -17,10 +17,12 @@ */ package co.rsk.rpc.modules.eth.subscribe; +import co.rsk.core.RskAddress; import co.rsk.rpc.JacksonBasedRpcSerializer; import co.rsk.rpc.JsonRpcSerializer; import co.rsk.rpc.modules.RskJsonRpcRequest; import com.fasterxml.jackson.databind.JsonMappingException; +import org.ethereum.rpc.Topic; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -28,7 +30,7 @@ import java.nio.charset.StandardCharsets; import static org.hamcrest.Matchers.*; -import static org.junit.Assert.assertThat; +import static org.hamcrest.MatcherAssert.assertThat; public class EthSubscribeRequestTest { private JsonRpcSerializer serializer = new JacksonBasedRpcSerializer(); @@ -52,7 +54,7 @@ public void deserializeLogsWithEmptyConfig() throws IOException { EthSubscribeLogsParams logsParams = validateParams(request, EthSubscribeLogsParams.class); - assertThat(logsParams.getAddress(), is(nullValue())); + assertThat(logsParams.getAddresses(), is(arrayWithSize(0))); assertThat(logsParams.getTopics(), is(arrayWithSize(0))); } @@ -65,24 +67,65 @@ public void deserializeLogsWithoutConfig() throws IOException { EthSubscribeLogsParams logsParams = validateParams(request, EthSubscribeLogsParams.class); - assertThat(logsParams.getAddress(), is(nullValue())); + assertThat(logsParams.getAddresses(), is(arrayWithSize(0))); assertThat(logsParams.getTopics(), is(arrayWithSize(0))); } @Test - public void deserializeLogs() throws IOException { - String logAddress = "0x3e1127bf1a673d378a8570f7a79cea4f10e20489"; - String logTopic = "0x2809c7e17bf978fbc7194c0a694b638c4215e9140cacc6c38ca36010b45697df"; - String message = "{\"jsonrpc\":\"2.0\",\"id\":333,\"method\":\"eth_subscribe\",\"params\":[\"logs\", {\"address\":\"" + logAddress + "\",\"topics\":[\"" + logTopic + "\"]}]}"; + public void deserializeLogsSingleParameters() throws IOException { + RskAddress logAddress = new RskAddress("0x3e1127bf1a673d378a8570f7a79cea4f10e20489"); + Topic logTopic = new Topic("0x2809c7e17bf978fbc7194c0a694b638c4215e9140cacc6c38ca36010b45697df"); + String message = "{\"jsonrpc\":\"2.0\",\"id\":333,\"method\":\"eth_subscribe\",\"params\":[\"logs\", {\"address\":\"" + logAddress.toJsonString() + "\",\"topics\":\"" + logTopic.toJsonString() + "\"}]}"; RskJsonRpcRequest request = serializer.deserializeRequest( new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8)) ); EthSubscribeLogsParams logsParams = validateParams(request, EthSubscribeLogsParams.class); - assertThat(logsParams.getAddress(), is(logAddress)); + assertThat(logsParams.getAddresses(), is(arrayWithSize(1))); + assertThat(logsParams.getAddresses(), hasItemInArray(logAddress)); assertThat(logsParams.getTopics(), is(arrayWithSize(1))); - assertThat(logsParams.getTopics(), hasItemInArray(logTopic)); + assertThat(logsParams.getTopics()[0], is(arrayWithSize(1))); + assertThat(logsParams.getTopics()[0], hasItemInArray(logTopic)); + } + + @Test + public void deserializeLogsParametersAsArrays() throws IOException { + RskAddress logAddress = new RskAddress("0x3e1127bf1a673d378a8570f7a79cea4f10e20489"); + Topic logTopic = new Topic("0x2809c7e17bf978fbc7194c0a694b638c4215e9140cacc6c38ca36010b45697df"); + String message = "{\"jsonrpc\":\"2.0\",\"id\":333,\"method\":\"eth_subscribe\",\"params\":[\"logs\", {\"address\":[\"" + logAddress.toJsonString() + "\"],\"topics\":[\"" + logTopic.toJsonString() + "\"]}]}"; + RskJsonRpcRequest request = serializer.deserializeRequest( + new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8)) + ); + + EthSubscribeLogsParams logsParams = validateParams(request, EthSubscribeLogsParams.class); + + assertThat(logsParams.getAddresses(), is(arrayWithSize(1))); + assertThat(logsParams.getAddresses(), hasItemInArray(logAddress)); + assertThat(logsParams.getTopics(), is(arrayWithSize(1))); + assertThat(logsParams.getTopics()[0], is(arrayWithSize(1))); + assertThat(logsParams.getTopics()[0], hasItemInArray(logTopic)); + } + + @Test + public void deserializeLogsNestedTopicArrays() throws IOException { + RskAddress logAddress = new RskAddress("0x3e1127bf1a673d378a8570f7a79cea4f10e20489"); + Topic logTopic1 = new Topic("0x2809c7e17bf978fbc7194c0a694b638c4215e9140cacc6c38ca36010b45697df"); + Topic logTopic2 = new Topic("0x4c0a694b638c4215e9140b6f08ecb38c4215e9140b6f08ecbdc8ab6b8ef9b245"); + String message = "{\"jsonrpc\":\"2.0\",\"id\":333,\"method\":\"eth_subscribe\",\"params\":[\"logs\", {\"address\":[\"" + logAddress.toJsonString() + "\"],\"topics\":[[\"" + logTopic1.toJsonString() + "\"], [\"" + logTopic2.toJsonString() + "\"]]}]}"; + RskJsonRpcRequest request = serializer.deserializeRequest( + new ByteArrayInputStream(message.getBytes(StandardCharsets.UTF_8)) + ); + + EthSubscribeLogsParams logsParams = validateParams(request, EthSubscribeLogsParams.class); + + assertThat(logsParams.getAddresses(), is(arrayWithSize(1))); + assertThat(logsParams.getAddresses(), hasItemInArray(logAddress)); + assertThat(logsParams.getTopics(), is(arrayWithSize(2))); + assertThat(logsParams.getTopics()[0], is(arrayWithSize(1))); + assertThat(logsParams.getTopics()[0], hasItemInArray(logTopic1)); + assertThat(logsParams.getTopics()[1], is(arrayWithSize(1))); + assertThat(logsParams.getTopics()[1], hasItemInArray(logTopic2)); } @Test(expected = JsonMappingException.class) diff --git a/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationEmitterTest.java b/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationEmitterTest.java index 5db32f60f8c..5366b1eb89b 100644 --- a/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationEmitterTest.java +++ b/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationEmitterTest.java @@ -17,11 +17,13 @@ */ package co.rsk.rpc.modules.eth.subscribe; +import co.rsk.core.RskAddress; import co.rsk.core.bc.BlockFork; import co.rsk.core.bc.BlockchainBranchComparator; import co.rsk.jsonrpc.JsonRpcMessage; import co.rsk.rpc.JsonRpcSerializer; import com.fasterxml.jackson.core.JsonProcessingException; +import io.netty.buffer.ByteBufHolder; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.ethereum.TestUtils; @@ -108,6 +110,32 @@ public void onBestBlockEventTriggersOneMessageToChannelPerLogInfoAndSubscription verify(channel2).flush(); } + @Test + public void filterEmittedLog() throws JsonProcessingException { + SubscriptionId subscriptionId = mock(SubscriptionId.class); + Channel channel = mock(Channel.class); + EthSubscribeLogsParams params = mock(EthSubscribeLogsParams.class); + RskAddress logSender = TestUtils.randomAddress(); + when(params.getAddresses()).thenReturn(new RskAddress[] { logSender }); + emitter.subscribe(subscriptionId, channel, params); + + byte[] log1Data = {0x1}; + byte[] log2Data = {0x2}; + Block block1 = testBlock(logInfo(logSender, log1Data)); + Block block2 = testBlock(logInfo(log2Data)); + + listener.onBestBlock(block1, null); + verifyLogsData(log1Data); + + BlockFork blockFork = mock(BlockFork.class); + when(blockFork.getNewBlocks()).thenReturn(Collections.singletonList(block2)); + when(comparator.calculateFork(block1, block2)).thenReturn(blockFork); + + clearInvocations(channel); + listener.onBestBlock(block2, null); + verify(channel, never()).write(any(ByteBufHolder.class)); + } + @Test public void emitsNewAndRemovedLogs() throws JsonProcessingException { SubscriptionId subscriptionId = mock(SubscriptionId.class); @@ -202,7 +230,12 @@ private Transaction transaction() { } private LogInfo logInfo(byte... data) { + return logInfo(TestUtils.randomAddress(), data); + } + + private LogInfo logInfo(final RskAddress logSource, byte... data) { LogInfo logInfo = mock(LogInfo.class); + when(logInfo.getAddress()).thenReturn(logSource.getBytes()); when(logInfo.getData()).thenReturn(data); return logInfo; } diff --git a/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationTest.java b/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationTest.java new file mode 100644 index 00000000000..61827678416 --- /dev/null +++ b/rskj-core/src/test/java/co/rsk/rpc/modules/eth/subscribe/LogsNotificationTest.java @@ -0,0 +1,120 @@ +/* + * This file is part of RskJ + * Copyright (C) 2019 RSK Labs Ltd. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program. If not, see . + */ + +package co.rsk.rpc.modules.eth.subscribe; + +import co.rsk.crypto.Keccak256; +import org.ethereum.TestUtils; +import org.ethereum.core.Block; +import org.ethereum.core.Transaction; +import org.ethereum.vm.DataWord; +import org.ethereum.vm.LogInfo; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.ethereum.rpc.TypeConverter.*; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class LogsNotificationTest { + + private static final String QUANTITY_JSON_HEX = toQuantityJsonHex(42); + private static Random random = new Random(); + + private LogsNotification logsNotification; + private Block block; + private Transaction transaction; + private LogInfo logInfo; + + @Before + public void createLogNotification() { + block = mock(Block.class); + transaction = mock(Transaction.class); + logInfo = mock(LogInfo.class); + this.logsNotification = new LogsNotification( + logInfo, + block, + 42, + transaction, + 42, + true + ); + } + + @Test + public void getLogIndex() { + assertThat(logsNotification.getLogIndex(), is(QUANTITY_JSON_HEX)); + } + + @Test + public void getBlockNumber() { + doReturn(42L).when(block).getNumber(); + assertThat(logsNotification.getBlockNumber(), is(QUANTITY_JSON_HEX)); + } + + @Test + public void getBlockHash() { + Keccak256 blockHash = TestUtils.randomHash(); + doReturn(blockHash).when(block).getHash(); + doCallRealMethod().when(block).getHashJsonString(); + assertThat(logsNotification.getBlockHash(), is(toUnformattedJsonHex(blockHash.getBytes()))); + } + + @Test + public void getTransactionHash() { + Keccak256 transactionHash = TestUtils.randomHash(); + doReturn(transactionHash).when(transaction).getHash(); + assertThat(logsNotification.getTransactionHash(), is(toUnformattedJsonHex(transactionHash.getBytes()))); + + } + + @Test + public void getTransactionIndex() { + assertThat(logsNotification.getTransactionIndex(), is(QUANTITY_JSON_HEX)); + } + + @Test + public void getAddress() { + byte[] logSender = TestUtils.randomAddress().getBytes(); + doReturn(logSender).when(logInfo).getAddress(); + assertThat(logsNotification.getAddress(), is(toJsonHex(logSender))); + } + + @Test + public void getData() { + byte[] logData = TestUtils.randomBytes(random.nextInt(1024)); + doReturn(logData).when(logInfo).getData(); + assertThat(logsNotification.getData(), is(toJsonHex(logData))); + } + + @Test + public void getTopics() { + List logTopics = IntStream.range(0, random.nextInt(1024)).mapToObj(i -> TestUtils.randomDataWord()).collect(Collectors.toList()); + doReturn(logTopics).when(logInfo).getTopics(); + for (int i = 0; i < logTopics.size(); i++) { + assertThat(logsNotification.getTopics().get(i), is(toJsonHex(logTopics.get(i).getData()))); + } + } +} \ No newline at end of file