From 27c384aca1b1e90110a8cb8c599e24cf8d5c7863 Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Mon, 5 Aug 2019 14:57:40 +0200 Subject: [PATCH 1/8] Support multiplexed sockets in ZmqSubscriber. Allow the ZmqSubscriber to have more than one socket, and read from them in a multiplexed fashion using zmq::poll. For now, we still have at most one socket open, but that will change when we support also receiving notifications about pending moves. --- xayagame/zmqsubscriber.cpp | 72 +++++++++++++++++++++----------- xayagame/zmqsubscriber.hpp | 10 ++++- xayagame/zmqsubscriber_tests.cpp | 2 +- 3 files changed, 56 insertions(+), 28 deletions(-) diff --git a/xayagame/zmqsubscriber.cpp b/xayagame/zmqsubscriber.cpp index 077bc8be..405aac67 100644 --- a/xayagame/zmqsubscriber.cpp +++ b/xayagame/zmqsubscriber.cpp @@ -6,6 +6,7 @@ #include +#include #include namespace xaya @@ -17,7 +18,7 @@ ZmqSubscriber::~ZmqSubscriber () { if (IsRunning ()) Stop (); - CHECK (socket == nullptr); + CHECK (sockets.empty ()); } void @@ -38,29 +39,49 @@ bool ZmqSubscriber::ReceiveMultiparts (std::string& topic, std::string& payload, uint32_t& seq) { + CHECK (!sockets.empty ()); + + std::vector pollItems; + for (const auto& s : sockets) + { + pollItems.emplace_back (); + pollItems.back ().socket = static_cast (*s); + pollItems.back ().events = ZMQ_POLLIN; + } + + /* Wait until we can receive messages from any of our sockets. */ + int rcPoll; + do + { + constexpr auto TIMEOUT = std::chrono::milliseconds (100); + rcPoll = zmq::poll (pollItems, TIMEOUT); + + /* In case of an error, zmq::poll throws instead of returning + negative values. */ + CHECK_GE (rcPoll, 0); + + /* Stop the thread if requested to, no need to read the messages anymore + if there are ones available. */ + if (shouldStop) + return false; + } + while (rcPoll == 0); + + /* Find the socket that is available (or one of them). */ + zmq::socket_t* socket = nullptr; + for (size_t i = 0; i < pollItems.size (); ++i) + if (pollItems[i].revents & ZMQ_POLLIN) + { + socket = sockets[i].get (); + break; + } + CHECK (socket != nullptr); + + /* Read all message parts from the socket. */ for (unsigned parts = 1; ; ++parts) { zmq::message_t msg; - try - { - bool gotMessage = false; - while (!gotMessage) - { - gotMessage = socket->recv (&msg); - - /* Check if a shutdown is requested. */ - if (shouldStop) - return false; - } - } - catch (const zmq::error_t& exc) - { - /* See if the error is because the socket was closed. In that case, - we just want to shut down the listener thread. */ - if (exc.num () == ETERM) - return false; - throw; - } + CHECK (socket->recv (&msg)); switch (parts) { @@ -197,16 +218,17 @@ ZmqSubscriber::Start () LOG (INFO) << "Starting ZMQ subscriber at address: " << addr; CHECK (!IsRunning ()); - socket = std::make_unique (ctx, ZMQ_SUB); + CHECK (sockets.empty ()); + + auto socket = std::make_unique (ctx, ZMQ_SUB); for (const auto& entry : listeners) for (const std::string cmd : {"game-block-attach", "game-block-detach"}) { const std::string topic = cmd + " json " + entry.first; socket->setsockopt (ZMQ_SUBSCRIBE, topic.data (), topic.size ()); } - constexpr int TIMEOUT_MS = 100; - socket->setsockopt (ZMQ_RCVTIMEO, &TIMEOUT_MS, sizeof (TIMEOUT_MS)); socket->connect (addr.c_str ()); + sockets.push_back (std::move (socket)); /* Reset last-seen sequence numbers for a fresh start. */ lastSeq.clear (); @@ -225,7 +247,7 @@ ZmqSubscriber::Stop () worker->join (); worker.reset (); - socket.reset (); + sockets.clear (); } } // namespace internal diff --git a/xayagame/zmqsubscriber.hpp b/xayagame/zmqsubscriber.hpp index 46371a4d..d17ad80b 100644 --- a/xayagame/zmqsubscriber.hpp +++ b/xayagame/zmqsubscriber.hpp @@ -17,6 +17,7 @@ #include #include #include +#include namespace xaya { @@ -66,8 +67,13 @@ class ZmqSubscriber std::string addr; /** The ZMQ context that is used by the this instance. */ zmq::context_t ctx; - /** The ZMQ socket used to subscribe to the Xaya daemon, if connected. */ - std::unique_ptr socket; + /** + * The ZMQ sockets used to subscribe to the Xaya daemon, if connected. + * If we have multiple addresses we listen to (e.g. different ones for + * blocks and pending moves), then this contains multiple sockets that + * are read in a multiplexed fashion using zmq::poll. + */ + std::vector> sockets; /** Game IDs and associated listeners. */ std::unordered_multimap listeners; diff --git a/xayagame/zmqsubscriber_tests.cpp b/xayagame/zmqsubscriber_tests.cpp index 23fca5af..6a6d3738 100644 --- a/xayagame/zmqsubscriber_tests.cpp +++ b/xayagame/zmqsubscriber_tests.cpp @@ -123,7 +123,7 @@ class BasicZmqSubscriberTests : public testing::Test zmq.shouldStop = true; threadToWaitFor.join (); - zmq.socket.reset (); + zmq.sockets.clear (); } }; From bea479a6b27c727c5f4fa109e1aefc7d09ed25ff Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Tue, 6 Aug 2019 09:00:53 +0200 Subject: [PATCH 2/8] Listen to pending moves in ZMQ subscriber. This extends the ZmqSubscriber class so that it can listen to pending move notifications in addition to block attach/detach ones. This works both if the notifications are on a separate address or on the same as block messages. --- xayagame/game.cpp | 16 +++ xayagame/game.hpp | 1 + xayagame/testutils.hpp | 2 +- xayagame/zmqsubscriber.cpp | 81 +++++++++--- xayagame/zmqsubscriber.hpp | 24 ++-- xayagame/zmqsubscriber_tests.cpp | 210 ++++++++++++++++++++++++++----- 6 files changed, 280 insertions(+), 54 deletions(-) diff --git a/xayagame/game.cpp b/xayagame/game.cpp index ff56801d..351f4d7c 100644 --- a/xayagame/game.cpp +++ b/xayagame/game.cpp @@ -325,6 +325,22 @@ Game::BlockDetach (const std::string& id, const Json::Value& data, ReinitialiseState (); } +void +Game::PendingMove (const std::string& id, const Json::Value& data) +{ + CHECK_EQ (id, gameId); + VLOG (2) << "Pending move:\n" << data; + + uint256 txid; + CHECK (txid.FromHex (data["txid"].asString ())); + VLOG (1) << "Processing pending move " << txid.ToHex (); + + std::lock_guard lock(mut); + + /* FIXME: Actual implementation, forwarding the data to a PendingMoveProcessor + (if we have it). */ +} + void Game::ConnectRpcClient (jsonrpc::IClientConnector& conn) { diff --git a/xayagame/game.hpp b/xayagame/game.hpp index cd4ad844..a01ceefa 100644 --- a/xayagame/game.hpp +++ b/xayagame/game.hpp @@ -159,6 +159,7 @@ class Game : private internal::ZmqListener bool seqMismatch) override; void BlockDetach (const std::string& id, const Json::Value& data, bool seqMismatch) override; + void PendingMove (const std::string& id, const Json::Value& data) override; /** * Adds this game's ID to the tracked games of the core daemon. diff --git a/xayagame/testutils.hpp b/xayagame/testutils.hpp index 6630d805..145bf3b5 100644 --- a/xayagame/testutils.hpp +++ b/xayagame/testutils.hpp @@ -197,7 +197,7 @@ class GameTestFixture : public testing::Test static std::string GetZmqEndpoint (const Game& g) { - return g.zmq.addr; + return g.zmq.addrBlocks; } static State diff --git a/xayagame/zmqsubscriber.cpp b/xayagame/zmqsubscriber.cpp index 405aac67..92b8dfc1 100644 --- a/xayagame/zmqsubscriber.cpp +++ b/xayagame/zmqsubscriber.cpp @@ -25,7 +25,14 @@ void ZmqSubscriber::SetEndpoint (const std::string& address) { CHECK (!IsRunning ()); - addr = address; + addrBlocks = address; +} + +void +ZmqSubscriber::SetEndpointForPending (const std::string& address) +{ + CHECK (!IsRunning ()); + addrPending = address; } void @@ -171,12 +178,22 @@ ZmqSubscriber::Listen (ZmqSubscriber* self) VLOG (1) << "Received " << topic << " with sequence number " << seq; VLOG (2) << "Payload:\n" << payload; + enum class TopicType + { + UNKNOWN, + ATTACH, + DETACH, + PENDING, + }; + std::string gameId; - bool isAttach; + TopicType type = TopicType::UNKNOWN; if (CheckTopicPrefix (topic, "game-block-attach json ", gameId)) - isAttach = true; + type = TopicType::ATTACH; else if (CheckTopicPrefix (topic, "game-block-detach json ", gameId)) - isAttach = false; + type = TopicType::DETACH; + else if (CheckTopicPrefix (topic, "game-pending-move json ", gameId)) + type = TopicType::PENDING; else LOG (FATAL) << "Unexpected topic of ZMQ notification: " << topic; @@ -204,31 +221,67 @@ ZmqSubscriber::Listen (ZmqSubscriber* self) << "Error parsing notification JSON: " << parseErrs; for (auto i = range.first; i != range.second; ++i) - if (isAttach) - i->second->BlockAttach (gameId, data, seqMismatch); - else - i->second->BlockDetach (gameId, data, seqMismatch); + switch (type) + { + case TopicType::ATTACH: + i->second->BlockAttach (gameId, data, seqMismatch); + break; + case TopicType::DETACH: + i->second->BlockDetach (gameId, data, seqMismatch); + break; + case TopicType::PENDING: + i->second->PendingMove (gameId, data); + break; + default: + LOG (FATAL) << "Invalid topic type: " << static_cast (type); + } } } void ZmqSubscriber::Start () { - CHECK (IsEndpointSet ()); - LOG (INFO) << "Starting ZMQ subscriber at address: " << addr; + CHECK (!addrBlocks.empty ()) << "ZMQ endpoint is not yet set"; CHECK (!IsRunning ()); CHECK (sockets.empty ()); + LOG (INFO) << "Starting ZMQ subscriber for blocks: " << addrBlocks; auto socket = std::make_unique (ctx, ZMQ_SUB); + socket->connect (addrBlocks.c_str ()); + zmq::socket_t* const socketBlocks = socket.get (); + sockets.push_back (std::move (socket)); for (const auto& entry : listeners) for (const std::string cmd : {"game-block-attach", "game-block-detach"}) { const std::string topic = cmd + " json " + entry.first; - socket->setsockopt (ZMQ_SUBSCRIBE, topic.data (), topic.size ()); + socketBlocks->setsockopt (ZMQ_SUBSCRIBE, topic.data (), topic.size ()); } - socket->connect (addr.c_str ()); - sockets.push_back (std::move (socket)); + + if (!addrPending.empty ()) + { + LOG (INFO) << "Receiving pending moves from: " << addrPending; + + zmq::socket_t* socketPending = nullptr; + if (addrPending == addrBlocks) + socketPending = socketBlocks; + else + { + socket = std::make_unique (ctx, ZMQ_SUB); + socket->connect (addrPending.c_str ()); + socketPending = socket.get (); + sockets.push_back (std::move (socket)); + } + + for (const auto& entry : listeners) + { + const std::string topic = "game-pending-move json " + entry.first; + socketPending->setsockopt (ZMQ_SUBSCRIBE, topic.data (), + topic.size ()); + } + } + else + LOG (INFO) << "Not subscribing to pending moves"; /* Reset last-seen sequence numbers for a fresh start. */ lastSeq.clear (); @@ -241,7 +294,7 @@ void ZmqSubscriber::Stop () { CHECK (IsRunning ()); - LOG (INFO) << "Stopping ZMQ subscriber at address " << addr; + LOG (INFO) << "Stopping ZMQ subscriber at address " << addrBlocks; shouldStop = true; diff --git a/xayagame/zmqsubscriber.hpp b/xayagame/zmqsubscriber.hpp index d17ad80b..0c2b4a99 100644 --- a/xayagame/zmqsubscriber.hpp +++ b/xayagame/zmqsubscriber.hpp @@ -52,6 +52,13 @@ class ZmqListener virtual void BlockDetach (const std::string& gameId, const Json::Value& data, bool seqMismatch) = 0; + /** + * Callback for pending moves added to the mempool. Since pending moves + * are best effort only, we do not care about sequence number mismatches. + */ + virtual void PendingMove (const std::string& gameId, + const Json::Value& data) = 0; + }; /** @@ -63,8 +70,11 @@ class ZmqSubscriber private: - /** The ZMQ endpoint to connect to. */ - std::string addr; + /** The ZMQ endpoint to connect to for block updates. */ + std::string addrBlocks; + /** The ZMQ endpoint to connect to for pending moves. */ + std::string addrPending; + /** The ZMQ context that is used by the this instance. */ zmq::context_t ctx; /** @@ -126,13 +136,11 @@ class ZmqSubscriber void SetEndpoint (const std::string& address); /** - * Returns whether the endpoint is set. + * Sets the ZMW endpoint that will be used to receive pending moves. + * Unlike SetEndpoint, this is optional. If not set, then the ZMQ + * thread will simply not listen to pending moves. */ - bool - IsEndpointSet () const - { - return !addr.empty (); - } + void SetEndpointForPending (const std::string& address); /** * Adds a new listener for the given game ID. Must not be called when diff --git a/xayagame/zmqsubscriber_tests.cpp b/xayagame/zmqsubscriber_tests.cpp index 6a6d3738..86a8b18c 100644 --- a/xayagame/zmqsubscriber_tests.cpp +++ b/xayagame/zmqsubscriber_tests.cpp @@ -26,6 +26,8 @@ using testing::_; using testing::InSequence; constexpr const char IPC_ENDPOINT[] = "ipc:///tmp/xayagame_zmqsubscriber_tests"; +constexpr const char IPC_ENDPOINT_PENDING[] + = "ipc:///tmp/xayagame_zmqsubscriber_tests_pending"; constexpr const char GAME_ID[] = "test-game"; constexpr const char OTHER_GAME_ID[] = "other-game"; @@ -41,12 +43,15 @@ class MockZmqListener : public ZmqListener should explicitly be specified in the individual tests. */ EXPECT_CALL (*this, BlockAttach (_, _, _)).Times (0); EXPECT_CALL (*this, BlockDetach (_, _, _)).Times (0); + EXPECT_CALL (*this, PendingMove (_, _)).Times (0); } MOCK_METHOD3 (BlockAttach, void (const std::string& gameId, const Json::Value& data, bool seqMismatch)); MOCK_METHOD3 (BlockDetach, void (const std::string& gameId, const Json::Value& data, bool seqMismatch)); + MOCK_METHOD2 (PendingMove, void (const std::string& gameId, + const Json::Value& data)); }; @@ -85,19 +90,43 @@ class BasicZmqSubscriberTests : public testing::Test } /** - * Sends a multipart message consisting of the given strings. + * Sends a multipart message consisting of the given strings on the given + * socket. */ - void - SendMultipart (const std::vector& parts) + static void + SendMultipart (zmq::socket_t& sock, const std::vector& parts) { for (size_t i = 0; i < parts.size (); ++i) { zmq::message_t msg(parts[i].begin (), parts[i].end ()); const bool hasMore = (i + 1 < parts.size ()); - ASSERT_TRUE (zmqSocket.send (msg, hasMore ? ZMQ_SNDMORE : 0)); + ASSERT_TRUE (sock.send (msg, hasMore ? ZMQ_SNDMORE : 0)); } } + void + SendMultipart (const std::vector& parts) + { + SendMultipart (zmqSocket, parts); + } + + /** + * Sends a message with the given topic, JSON payload and sequence number + * on the given socket. + */ + static void + SendMessage (zmq::socket_t& sock, const std::string& topic, + const Json::Value& payload, const uint32_t seq) + { + std::ostringstream payloadStr; + payloadStr << payload; + + const std::string seqData(reinterpret_cast (&seq), + sizeof (seq)); + + SendMultipart (sock, {topic, payloadStr.str (), seqData}); + } + static void DisableListening (ZmqSubscriber& zmq) { @@ -133,14 +162,6 @@ namespace /* ************************************************************************** */ -TEST_F (BasicZmqSubscriberTests, IsEndpointSet) -{ - ZmqSubscriber zmq; - EXPECT_FALSE (zmq.IsEndpointSet ()); - zmq.SetEndpoint (IPC_ENDPOINT); - EXPECT_TRUE (zmq.IsEndpointSet ()); -} - TEST_F (BasicZmqSubscriberTests, SetEndpointWhenRunning) { EXPECT_DEATH ( @@ -150,6 +171,14 @@ TEST_F (BasicZmqSubscriberTests, SetEndpointWhenRunning) zmq.Start (); zmq.SetEndpoint ("foo"); }, "!IsRunning"); + + EXPECT_DEATH ( + { + ZmqSubscriber zmq; + zmq.SetEndpoint (IPC_ENDPOINT); + zmq.Start (); + zmq.SetEndpointForPending ("foo"); + }, "!IsRunning"); } TEST_F (BasicZmqSubscriberTests, AddListenerWhenRunning) @@ -169,7 +198,7 @@ TEST_F (BasicZmqSubscriberTests, StartWithoutEndpoint) { ZmqSubscriber zmq; zmq.Start (); - }, "IsEndpointSet"); + }, "ZMQ endpoint is not yet set"); } TEST_F (BasicZmqSubscriberTests, StartedTwice) @@ -303,34 +332,18 @@ class ZmqSubscriberTests : public BasicZmqSubscriberTests SleepSome (); } - /** - * Sends a message with the given topic, JSON payload and sequence number. - */ - void - SendMessage (const std::string& topic, const Json::Value& payload, - const uint32_t seq) - { - std::ostringstream payloadStr; - payloadStr << payload; - - const std::string seqData(reinterpret_cast (&seq), - sizeof (seq)); - - SendMultipart ({topic, payloadStr.str (), seqData}); - } - void SendAttach (const std::string& gameId, const Json::Value& payload, const uint32_t seq) { - SendMessage ("game-block-attach json " + gameId, payload, seq); + SendMessage (zmqSocket, "game-block-attach json " + gameId, payload, seq); } void SendDetach (const std::string& gameId, const Json::Value& payload, const uint32_t seq) { - SendMessage ("game-block-detach json " + gameId, payload, seq); + SendMessage (zmqSocket, "game-block-detach json " + gameId, payload, seq); } }; @@ -440,6 +453,141 @@ TEST_F (ZmqSubscriberTests, InvalidJson) }, "Error parsing"); } +/* ************************************************************************** */ + +class ZmqSubscriberPendingTests : public BasicZmqSubscriberTests +{ + +protected: + + /** Second socket bound to an alternate endpoint. */ + zmq::socket_t zmqSocketPending; + + ZmqSubscriber zmq; + + ZmqSubscriberPendingTests () + : zmqSocketPending(zmqCtx, ZMQ_PUB) + { + zmqSocketPending.bind (IPC_ENDPOINT_PENDING); + + zmq.SetEndpoint (IPC_ENDPOINT); + zmq.AddListener (GAME_ID, &mockListener); + } + + ~ZmqSubscriberPendingTests () + { + /* Wait so that the worker thread can process all sent messages before + the mock object verifies expectations. */ + SleepSome (); + } + + /** + * Runs a test where just pending messages are sent and received on the + * given socket. + */ + void + TestJustPending (zmq::socket_t& sock) + { + Json::Value payload1; + payload1["test"] = 42; + Json::Value payload2; + payload2["test"] = 5; + + { + InSequence dummy; + EXPECT_CALL (mockListener, PendingMove (GAME_ID, payload1)); + EXPECT_CALL (mockListener, PendingMove (GAME_ID, payload2)); + } + + SendPending (sock, GAME_ID, payload1); + SendPending (sock, GAME_ID, payload2); + } + + /** + * Sends a pending move notification on the given socket. + */ + static void + SendPending (zmq::socket_t& sock, const std::string& gameId, + const Json::Value& payload) + { + SendMessage (sock, "game-pending-move json " + gameId, payload, 42); + } + +}; + +TEST_F (ZmqSubscriberPendingTests, BasicOneSocket) +{ + zmq.SetEndpointForPending (IPC_ENDPOINT); + zmq.Start (); + SleepSome (); + + TestJustPending (zmqSocket); +} + +TEST_F (ZmqSubscriberPendingTests, BasicTwoSockets) +{ + zmq.SetEndpointForPending (IPC_ENDPOINT_PENDING); + zmq.Start (); + SleepSome (); + + TestJustPending (zmqSocketPending); +} + +TEST_F (ZmqSubscriberPendingTests, MixedOneSocket) +{ + zmq.SetEndpointForPending (IPC_ENDPOINT); + zmq.Start (); + SleepSome (); + + const std::string gameId = GAME_ID; + + Json::Value payload; + payload["foo"] = "bar"; + + /* When we use just one socket for both types of notifications, then + they should be received in exactly the order in which they are sent + (as we just read from that one socket sequentially). */ + + { + InSequence dummy; + EXPECT_CALL (mockListener, BlockDetach (gameId, payload, _)); + EXPECT_CALL (mockListener, PendingMove (gameId, payload)); + EXPECT_CALL (mockListener, BlockAttach (gameId, payload, _)); + } + + SendMessage (zmqSocket, "game-block-detach json " + gameId, payload, 1); + SendPending (zmqSocket, gameId, payload); + SendMessage (zmqSocket, "game-block-attach json " + gameId, payload, 2); +} + +TEST_F (ZmqSubscriberPendingTests, MixedTwoSockets) +{ + zmq.SetEndpointForPending (IPC_ENDPOINT_PENDING); + zmq.Start (); + SleepSome (); + + const std::string gameId = GAME_ID; + + Json::Value payload; + payload["foo"] = "bar"; + + /* If we use two sockets and mix the messages from them, it is not defined + in what order we will receive block messages vs pending messages. */ + + { + InSequence dummy; + EXPECT_CALL (mockListener, BlockDetach (gameId, payload, _)); + EXPECT_CALL (mockListener, BlockAttach (gameId, payload, _)); + } + EXPECT_CALL (mockListener, PendingMove (gameId, payload)); + + SendMessage (zmqSocket, "game-block-detach json " + gameId, payload, 1); + SendPending (zmqSocketPending, gameId, payload); + SendMessage (zmqSocket, "game-block-attach json " + gameId, payload, 2); +} + +/* ************************************************************************** */ + } // anonymous namespace } // namespace internal } // namespace xaya From 2ba7ba8d08260d223ee6dada4f700ff500b15e02 Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Tue, 6 Aug 2019 09:23:54 +0200 Subject: [PATCH 3/8] Add getrawmempool to the Xaya RPC stub. Expose the getrawmempool method of Xaya Core in the RPC stub. This method will be useful for the handling of pending moves (because we can use it to compare our list of pending moves to the real mempool and make sure we expunge spurious entries from time to time). --- xayagame/rpc-stubs/xaya.json | 5 +++++ xayagame/testutils.cpp | 1 + xayagame/testutils.hpp | 1 + 3 files changed, 7 insertions(+) diff --git a/xayagame/rpc-stubs/xaya.json b/xayagame/rpc-stubs/xaya.json index 16096a4c..b2138664 100644 --- a/xayagame/rpc-stubs/xaya.json +++ b/xayagame/rpc-stubs/xaya.json @@ -57,6 +57,11 @@ }, "returns": {} }, + { + "name": "getrawmempool", + "params": {}, + "returns": [] + }, { "name": "name_pending", "params": {}, diff --git a/xayagame/testutils.cpp b/xayagame/testutils.cpp index cf6e9874..8ffbd1ae 100644 --- a/xayagame/testutils.cpp +++ b/xayagame/testutils.cpp @@ -57,6 +57,7 @@ MockXayaRpcServer::MockXayaRpcServer (jsonrpc::AbstractServerConnector& conn) EXPECT_CALL (*this, getblockheader (_)).Times (0); EXPECT_CALL (*this, game_sendupdates (_, _)).Times (0); EXPECT_CALL (*this, verifymessage (_, _, _)).Times (0); + EXPECT_CALL (*this, getrawmempool ()).Times (0); EXPECT_CALL (*this, name_pending ()).Times (0); } diff --git a/xayagame/testutils.hpp b/xayagame/testutils.hpp index 145bf3b5..3efb7675 100644 --- a/xayagame/testutils.hpp +++ b/xayagame/testutils.hpp @@ -86,6 +86,7 @@ class MockXayaRpcServer : public XayaRpcServerStub const std::string& message, const std::string& signature)); + MOCK_METHOD0 (getrawmempool, Json::Value ()); MOCK_METHOD0 (name_pending, Json::Value ()); }; From f50eccf12bc4497c4646304e626c91dca7ebf084 Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Mon, 5 Aug 2019 10:31:22 +0200 Subject: [PATCH 4/8] Move some context to superclass of GameLogic. This creates the new class GameProcessorWithContext, which holds the game ID, chain and Xaya Core RPC connection. That way, we can reuse that code for the upcoming processor of pending moves. --- xayagame/gamelogic.cpp | 88 +++++++++++++++++--------------- xayagame/gamelogic.hpp | 113 +++++++++++++++++++++++++---------------- 2 files changed, 116 insertions(+), 85 deletions(-) diff --git a/xayagame/gamelogic.cpp b/xayagame/gamelogic.cpp index 80f2f003..1a575a2a 100644 --- a/xayagame/gamelogic.cpp +++ b/xayagame/gamelogic.cpp @@ -42,15 +42,55 @@ Context::Context (const GameLogic& l, const uint256& rndSeed) Chain Context::GetChain () const { - CHECK (logic.chain != Chain::UNKNOWN); - return logic.chain; + return logic.GetChain (); } const std::string& Context::GetGameId () const { - CHECK (!logic.gameId.empty ()); - return logic.gameId; + return logic.GetGameId (); +} + +/* ************************************************************************** */ + +Chain +GameProcessorWithContext::GetChain () const +{ + CHECK (chain != Chain::UNKNOWN); + return chain; +} + +const std::string& +GameProcessorWithContext::GetGameId () const +{ + CHECK (chain != Chain::UNKNOWN); + return gameId; +} + +XayaRpcClient& +GameProcessorWithContext::GetXayaRpc () +{ + CHECK (rpcClient != nullptr); + return *rpcClient; +} + +void +GameProcessorWithContext::InitialiseGameContext (const Chain c, + const std::string& id, + XayaRpcClient* rpc) +{ + CHECK (c != Chain::UNKNOWN); + CHECK (!id.empty ()); + + CHECK (chain == Chain::UNKNOWN) << "Game context is already initialised"; + chain = c; + gameId = id; + rpcClient = rpc; + + if (rpcClient == nullptr) + LOG (WARNING) + << "Game context has been initialised without an RPC connection;" + " some features will be missing"; } /* ************************************************************************** */ @@ -96,31 +136,6 @@ class GameLogic::ContextSetter }; -void -GameLogic::InitialiseGameContext (const Chain c, const std::string& id, - XayaRpcClient* rpc) -{ - CHECK (c != Chain::UNKNOWN); - CHECK (!id.empty ()); - - CHECK (chain == Chain::UNKNOWN) << "GameLogic is already initialised"; - chain = c; - gameId = id; - rpcClient = rpc; - - if (rpcClient == nullptr) - LOG (WARNING) - << "GameLogic has been initialised without an RPC connection;" - " some features will be missing"; -} - -Chain -GameLogic::GetChain () const -{ - CHECK (chain != Chain::UNKNOWN); - return chain; -} - Context& GameLogic::GetContext () { @@ -135,20 +150,11 @@ GameLogic::GetContext () const return *ctx; } -XayaRpcClient& -GameLogic::GetXayaRpc () -{ - CHECK (rpcClient != nullptr); - return *rpcClient; -} - GameStateData GameLogic::GetInitialState (unsigned& height, std::string& hashHex) { - CHECK (!gameId.empty ()); - SHA256 rndSeed; - rndSeed << "initial state" << gameId; + rndSeed << "initial state" << GetGameId (); Context context(*this, rndSeed.Finalise ()); ContextSetter setter(*this, context); @@ -188,7 +194,7 @@ GameLogic::ProcessForward (const GameStateData& oldState, const Json::Value& blockData, UndoData& undoData) { - Context context(*this, BlockRngSeed (gameId, blockData)); + Context context(*this, BlockRngSeed (GetGameId (), blockData)); ContextSetter setter(*this, context); return ProcessForwardInternal (oldState, blockData, undoData); @@ -199,7 +205,7 @@ GameLogic::ProcessBackwards (const GameStateData& newState, const Json::Value& blockData, const UndoData& undoData) { - Context context(*this, BlockRngSeed (gameId, blockData)); + Context context(*this, BlockRngSeed (GetGameId (), blockData)); ContextSetter setter(*this, context); return ProcessBackwardsInternal (newState, blockData, undoData); diff --git a/xayagame/gamelogic.hpp b/xayagame/gamelogic.hpp index 1236e2d2..522aa700 100644 --- a/xayagame/gamelogic.hpp +++ b/xayagame/gamelogic.hpp @@ -39,6 +39,74 @@ enum class Chain */ std::string ChainToString (Chain c); +/** + * Generic class for a processor game state, which mainly holds some contextual + * information (like the chain and game ID). This is used as a common + * superclass for the block update logic (GameLogic) and the logic + * for processing pending moves (PendingMoveProcessor). + */ +class GameProcessorWithContext +{ + +private: + + /** + * The chain that the game is running on. This may influence the rules + * and is provided via the Context. + */ + Chain chain = Chain::UNKNOWN; + + /** + * The game id of the connected game. This is used to seed the random + * number generator. + */ + std::string gameId; + + /** + * Xaya Core RPC connection, if it has been initialised already from the + * Game instance. + */ + XayaRpcClient* rpcClient = nullptr; + +protected: + + GameProcessorWithContext () = default; + + /** + * Returns the chain the game is running on. + */ + Chain GetChain () const; + + /** + * Returns the current game ID. + */ + const std::string& GetGameId () const; + + /** + * Returns the configured RPC connection to Xaya Core. Must only be called + * after InitialiseGameContext was invoked with a non-null RPC client. + */ + XayaRpcClient& GetXayaRpc (); + +public: + + virtual ~GameProcessorWithContext () = default; + + /** + * Initialises the instance with some data that is obtained by a Game + * instance after the RPC connection to Xaya Core is up. + * + * The RPC client instance may be null, but then certain features + * (depending on GetXayaRpc) will be disabled. + * + * This must only be called once. It is typically done by the Game + * instance, but may also be used for testing. + */ + void InitialiseGameContext (Chain c, const std::string& id, + XayaRpcClient* rpc); + +}; + /** * Context for a call to the callbacks of the GameLogic class. This is * provided by GameLogic itself so that the implementing subclasses can @@ -116,31 +184,13 @@ class Context * and the libxayagame-stored data, and allows atomic transactions spanning * both of them. */ -class GameLogic +class GameLogic : public GameProcessorWithContext { private: class ContextSetter; - /** - * The chain that the game is running on. This may influence the rules - * and is provided via the Context. - */ - Chain chain = Chain::UNKNOWN; - - /** - * The game id of the connected game. This is used to seed the random - * number generator. - */ - std::string gameId; - - /** - * Xaya Core RPC connection, if it has been initialised already from the - * Game instance. - */ - XayaRpcClient* rpcClient = nullptr; - /** Current Context instance if any. */ Context* ctx = nullptr; @@ -148,11 +198,6 @@ class GameLogic protected: - /** - * Returns the chain the game is running on. - */ - Chain GetChain () const; - /** * Returns the current Context instance. This function must only be * called while one of the Internal callbacks is running in a subclass. @@ -165,12 +210,6 @@ class GameLogic */ const Context& GetContext () const; - /** - * Returns the configured RPC connection to Xaya Core. Must only be called - * after InitialiseGameContext was invoked with a non-null RPC client. - */ - XayaRpcClient& GetXayaRpc (); - /** * Returns the initial state (as well as the associated block height * and block hash in big-endian hex) for the game. @@ -206,20 +245,6 @@ class GameLogic public: GameLogic () = default; - virtual ~GameLogic () = default; - - /** - * Initialises the instance with some data that is obtained by a Game - * instance after the RPC connection to Xaya Core is up. - * - * The RPC client instance may be null, but then certain features - * (depending on GetXayaRpc) will be disabled. - * - * This must only be called once. It is typically done by the Game - * instance, but may also be used for testing. - */ - void InitialiseGameContext (Chain c, const std::string& id, - XayaRpcClient* rpc); /** * Returns the initial state for the game. This is the function that is From d2891f4f17ffa168503d9dd9fcb013ea1d066245 Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Mon, 5 Aug 2019 13:24:50 +0200 Subject: [PATCH 5/8] Implement PendingMoveProcessor core logic. This adds the new class PendingMoveProcessor, which allows subclasses to receive updates about pending moves and build a "pending state". It has logic to track currently known pending moves, sync them with Xaya Core's mempool using getrawmempool, and to construct the "correct" pending state when blocks or new transactions come in. --- xayagame/Makefile.am | 3 + xayagame/pendingmoves.cpp | 157 ++++++++++++++++++ xayagame/pendingmoves.hpp | 133 ++++++++++++++++ xayagame/pendingmoves_tests.cpp | 274 ++++++++++++++++++++++++++++++++ 4 files changed, 567 insertions(+) create mode 100644 xayagame/pendingmoves.cpp create mode 100644 xayagame/pendingmoves.hpp create mode 100644 xayagame/pendingmoves_tests.cpp diff --git a/xayagame/Makefile.am b/xayagame/Makefile.am index 08ff42a3..24322e23 100644 --- a/xayagame/Makefile.am +++ b/xayagame/Makefile.am @@ -35,6 +35,7 @@ libxayagame_la_SOURCES = \ heightcache.cpp \ lmdbstorage.cpp \ mainloop.cpp \ + pendingmoves.cpp \ pruningqueue.cpp \ signatures.cpp \ sqlitegame.cpp \ @@ -50,6 +51,7 @@ xayagame_HEADERS = \ heightcache.hpp \ lmdbstorage.hpp \ mainloop.hpp \ + pendingmoves.hpp \ pruningqueue.hpp \ signatures.hpp \ sqlitegame.hpp \ @@ -92,6 +94,7 @@ tests_SOURCES = \ heightcache_tests.cpp \ lmdbstorage_tests.cpp \ mainloop_tests.cpp \ + pendingmoves_tests.cpp \ pruningqueue_tests.cpp \ signatures_tests.cpp \ sqlitegame_tests.cpp \ diff --git a/xayagame/pendingmoves.cpp b/xayagame/pendingmoves.cpp new file mode 100644 index 00000000..dcb54088 --- /dev/null +++ b/xayagame/pendingmoves.cpp @@ -0,0 +1,157 @@ +// Copyright (C) 2019 The Xaya developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "pendingmoves.hpp" + +#include + +namespace xaya +{ + +/** + * Helper class to set/unset the current-state context in a PendingMoveProcessor + * using RAII. + */ +class PendingMoveProcessor::ContextSetter +{ + +private: + + /** The corresponding PendingMoveProcessor instance. */ + PendingMoveProcessor& proc; + +public: + + /** + * Sets the context based on the current state. + */ + explicit ContextSetter (PendingMoveProcessor& p, const GameStateData& s) + : proc(p) + { + CHECK (proc.ctx == nullptr); + proc.ctx = std::make_unique (s); + } + + /** + * Unsets the context. + */ + ~ContextSetter () + { + CHECK (proc.ctx != nullptr); + proc.ctx.reset (); + } + +}; + +const GameStateData& +PendingMoveProcessor::GetConfirmedState () const +{ + CHECK (ctx != nullptr) << "No callback is running at the moment"; + return ctx->state; +} + +void +PendingMoveProcessor::Reset () +{ + const auto mempool = GetXayaRpc ().getrawmempool (); + VLOG (1) + << "Rebuilding pending move state with " << mempool.size () + << " transactions in the (full) mempool..."; + + Clear (); + std::map newPending; + for (const auto& txidStr : mempool) + { + uint256 txid; + CHECK (txidStr.isString ()); + CHECK (txid.FromHex (txidStr.asString ())); + + const auto mit = pending.find (txid); + if (mit == pending.end ()) + continue; + + newPending.emplace (txid, mit->second); + AddPendingMove (mit->second); + } + + VLOG (1) + << "Sync with real mempool reduced size of pending moves from " + << pending.size () << " to " << newPending.size (); + pending = std::move (newPending); +} + +namespace +{ + +/** + * Extracts the txid of a move JSON object as uint256. + */ +uint256 +GetMoveTxid (const Json::Value& mv) +{ + const auto& txidVal = mv["txid"]; + CHECK (txidVal.isString ()); + + uint256 txid; + CHECK (txid.FromHex (txidVal.asString ())); + + return txid; +} + +} // anonymous namespace + +void +PendingMoveProcessor::ProcessAttachedBlock (const GameStateData& state) +{ + VLOG (1) << "Updating pending state for attached block..."; + + ContextSetter setter(*this, state); + Reset (); +} + +void +PendingMoveProcessor::ProcessDetachedBlock (const GameStateData& state, + const Json::Value& blockData) +{ + /* We want to insert moves from the detached block into our map of + known moves, so that we can process them in case they are later on still + in Xaya Core's mempool. */ + const auto& mvArray = blockData["moves"]; + CHECK (mvArray.isArray ()); + for (const auto& mv : mvArray) + { + const uint256 txid = GetMoveTxid (mv); + pending.emplace (txid, mv); + } + + VLOG (1) + << "Updating pending state for detached block " + << blockData["block"]["hash"].asString () << ": " + << mvArray.size () << " moves unconfirmed"; + VLOG (2) << "Block data: " << blockData; + + ContextSetter setter(*this, state); + Reset (); +} + +void +PendingMoveProcessor::ProcessMove (const GameStateData& state, + const Json::Value& mv) +{ + const uint256 txid = GetMoveTxid (mv); + VLOG (1) << "Processing pending move: " << txid.ToHex (); + VLOG (2) << "Full data: " << mv; + + const auto inserted = pending.emplace (txid, mv); + if (!inserted.second) + { + VLOG (1) << "The move is already known"; + return; + } + + ContextSetter setter(*this, state); + AddPendingMove (mv); +} + +} // namespace xaya diff --git a/xayagame/pendingmoves.hpp b/xayagame/pendingmoves.hpp new file mode 100644 index 00000000..f6ba442e --- /dev/null +++ b/xayagame/pendingmoves.hpp @@ -0,0 +1,133 @@ +// Copyright (C) 2019 The Xaya developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef XAYAGAME_PENDINGMOVES_HPP +#define XAYAGAME_PENDINGMOVES_HPP + +#include "gamelogic.hpp" +#include "storage.hpp" + +#include + +#include + +#include +#include + +namespace xaya +{ + +/** + * Processor for pending moves in the game. This can be subclassed with + * actual logic (and storage of data) as needed by games. They can then + * implement whatever processing they need to keep track of a "pending state" + * based on the current mempool. + */ +class PendingMoveProcessor : public GameProcessorWithContext +{ + +private: + + /** + * Data about the "current state" accessible to the callbacks while they + * are being executed. + */ + struct CurrentState + { + + /** The current confirmed game state. */ + const GameStateData& state; + + explicit CurrentState (const GameStateData& s) + : state(s) + {} + + }; + + /** + * All currently known pending moves, indexed by their txid. This is used + * to check whether a new move is already known, and also to retrieve the + * actual data when we sync with getrawmempool. + */ + std::map pending; + + /** While a callback is running, the state context. */ + std::unique_ptr ctx; + + /** + * Resets the internal state, by clearing and then rebuilding from the + * list of pending moves, and syncing them with getrawmempool. This assumes + * that the state context is already set up. + */ + void Reset (); + + class ContextSetter; + +protected: + + /** + * Returns the currently confirmed on-chain game state. This must only + * be called while a callback (Clear or AddPendingMove) is currently + * running. + */ + const GameStateData& GetConfirmedState () const; + + /** + * Clears the state, so it corresponds to an empty mempool. + */ + virtual void Clear () = 0; + + /** + * Adds a new pending move to the current pending state in this instance. + * mv contains the full move data as JSON. + * + * Between calls to Clear, this is called at most once for any particular + * transaction. If one move is built on another (i.e. spending the other's + * name), then it is usually passed to AddPendingMove later. + * + * During exceptional situations (e.g. reorgs), it may happen that + * conflicting, out-of-order or already confirmed moves are passed here. + * Implementations must be able to handle that gracefully, although the + * resulting pending state may be "wrong". + */ + virtual void AddPendingMove (const Json::Value& mv) = 0; + +public: + + PendingMoveProcessor () = default; + + /** + * Processes a newly attached block. This checks the current mempool + * of Xaya Core and then rebuilds the pending state based on known moves + * that are still in the mempool. + */ + void ProcessAttachedBlock (const GameStateData& state); + + /** + * Processes a detached block. This clears the pending state and rebuilds + * it from Xaya Core's mempool (including re-added transactions from + * the block that was just detached). + * + * state must be the confirmed game-state *after* the block has been + * detached already (i.e. the state before, not "at", the block). + */ + void ProcessDetachedBlock (const GameStateData& state, + const Json::Value& blockData); + + /** + * Processes a newly received pending move. + */ + void ProcessMove (const GameStateData& state, const Json::Value& mv); + + /** + * Returns a JSON representation of the current state. This is exposed + * by the GSP's RPC server for use by frontends (and the likes). + */ + virtual Json::Value ToJson () const = 0; + +}; + +} // namespace xaya + +#endif // XAYAGAME_PENDINGMOVES_HPP diff --git a/xayagame/pendingmoves_tests.cpp b/xayagame/pendingmoves_tests.cpp new file mode 100644 index 00000000..b6f87390 --- /dev/null +++ b/xayagame/pendingmoves_tests.cpp @@ -0,0 +1,274 @@ +// Copyright (C) 2019 The Xaya developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "pendingmoves.hpp" + +#include "testutils.hpp" + +#include + +#include +#include + +#include +#include +#include + +namespace xaya +{ +namespace +{ + +using testing::Return; + +/** + * A simple implementation of a pending move processor. Moves are expected + * to be just strings, and we keep track of an ordered list of those strings + * for each name (as a JSON array). + */ +class MessageArrayPendingMoves : public PendingMoveProcessor +{ + +private: + + /** + * The current data as JSON object. It has two fields: The "names" + * field holds an object mapping names to their arrays, and the "confirmed" + * field stores the current game-state string. The latter is useful to + * test exposure of the confirmed state to the pending processor. + */ + Json::Value data; + +protected: + + void + Clear () override + { + data = Json::Value (Json::objectValue); + data["confirmed"] = GetConfirmedState (); + data["names"] = Json::Value (Json::objectValue); + } + + void + AddPendingMove (const Json::Value& mv) override + { + const std::string name = mv["name"].asString (); + const std::string msg = mv["move"].asString (); + + auto& names = data["names"]; + CHECK (names.isObject ()); + if (!names.isMember (name)) + names[name] = Json::Value (Json::arrayValue); + + names[name].append (msg); + + data["confirmed"] = GetConfirmedState (); + } + +public: + + MessageArrayPendingMoves () + : data(Json::objectValue) + { + data["names"] = Json::Value (Json::objectValue); + } + + Json::Value + ToJson () const override + { + return data; + } + +}; + +/** + * Constructs a move JSON for the given name and value. The txid is computed + * by hashing the value. + */ +Json::Value +MoveJson (const std::string& nm, const std::string& val) +{ + Json::Value res(Json::objectValue); + res["txid"] = SHA256::Hash (val).ToHex (); + res["name"] = nm; + res["move"] = val; + + return res; +} + +/** + * Constructs a block JSON for the given list of moves. + */ +Json::Value +BlockJson (const std::vector& moves) +{ + Json::Value mvJson(Json::arrayValue); + for (const auto& mv : moves) + mvJson.append (mv); + + Json::Value res(Json::objectValue); + res["moves"] = mvJson; + + return res; +} + +class PendingMovesTests : public testing::Test +{ + +private: + + jsonrpc::HttpServer httpServer; + jsonrpc::HttpClient httpClient; + + MockXayaRpcServer mockXayaServer; + XayaRpcClient rpcClient; + +protected: + + MessageArrayPendingMoves proc; + + PendingMovesTests () + : httpServer(MockXayaRpcServer::HTTP_PORT), + httpClient(MockXayaRpcServer::HTTP_URL), + mockXayaServer(httpServer), + rpcClient(httpClient) + { + proc.InitialiseGameContext (Chain::MAIN, "game id", &rpcClient); + + mockXayaServer.StartListening (); + } + + ~PendingMovesTests () + { + mockXayaServer.StopListening (); + } + + /** + * Sets the mempool to return from the mock server. The txid's are created + * by hashing the provided strings (corresponding to "value" in MoveJson). + */ + void + SetMempool (const std::vector& values) + { + Json::Value txids(Json::arrayValue); + for (const auto& v : values) + txids.append (SHA256::Hash (v).ToHex ()); + + EXPECT_CALL (mockXayaServer, getrawmempool ()) + .WillRepeatedly (Return (txids)); + } + +}; + +/* ************************************************************************** */ + +TEST_F (PendingMovesTests, AddingMoves) +{ + proc.ProcessMove ("state", MoveJson ("foo", "bar")); + proc.ProcessMove ("state", MoveJson ("foo", "baz")); + proc.ProcessMove ("state", MoveJson ("foo", "bar")); + proc.ProcessMove ("state", MoveJson ("abc", "def")); + proc.ProcessMove ("state", MoveJson ("abc", "def")); + + EXPECT_EQ (proc.ToJson (), ParseJson (R"({ + "confirmed": "state", + "names": + { + "abc": ["def"], + "foo": ["bar", "baz"] + } + })")); +} + +TEST_F (PendingMovesTests, AttachedBlock) +{ + proc.ProcessMove ("old", MoveJson ("foo", "c")); + proc.ProcessMove ("old", MoveJson ("foo", "b")); + proc.ProcessMove ("old", MoveJson ("foo", "a")); + proc.ProcessMove ("old", MoveJson ("bar", "x")); + proc.ProcessMove ("old", MoveJson ("baz", "y")); + + SetMempool ({"b", "c", "y", "z"}); + proc.ProcessAttachedBlock ("new"); + + EXPECT_EQ (proc.ToJson (), ParseJson (R"({ + "confirmed": "new", + "names": + { + "foo": ["b", "c"], + "baz": ["y"] + } + })")); +} + +TEST_F (PendingMovesTests, DetachedBlock) +{ + proc.ProcessMove ("new", MoveJson ("foo", "b")); + proc.ProcessMove ("new", MoveJson ("bar", "x")); + + SetMempool ({"a", "b", "x", "y", "z"}); + proc.ProcessDetachedBlock ("old", BlockJson ( + { + MoveJson ("foo", "a"), + MoveJson ("baz", "y"), + })); + + /* This should be ignored, as we have it already. */ + proc.ProcessMove ("old", MoveJson ("foo", "a")); + + EXPECT_EQ (proc.ToJson (), ParseJson (R"({ + "confirmed": "old", + "names": + { + "foo": ["a", "b"], + "bar": ["x"], + "baz": ["y"] + } + })")); +} + +TEST_F (PendingMovesTests, OneBlockReorg) +{ + /* This test verifies what happens in the "typical" situation of a + one-block-reorg (orphan block), assuming that notifications are sent + in order (as they would if one socket is used for ZMQ blocks and + pending moves). While this should be covered by the tests before, + it makes sense to verify this important situation also explicitly. */ + + proc.ProcessMove ("new 1", MoveJson ("foo", "b")); + proc.ProcessMove ("new 1", MoveJson ("bar", "x")); + + SetMempool ({"a", "b", "x", "y"}); + proc.ProcessDetachedBlock ("old", BlockJson ( + { + MoveJson ("foo", "a"), + MoveJson ("baz", "y"), + })); + + proc.ProcessMove ("old", MoveJson ("foo", "a")); + proc.ProcessMove ("baz", MoveJson ("baz", "y")); + + EXPECT_EQ (proc.ToJson (), ParseJson (R"({ + "confirmed": "old", + "names": + { + "foo": ["a", "b"], + "bar": ["x"], + "baz": ["y"] + } + })")); + + SetMempool ({}); + proc.ProcessAttachedBlock ("new 2"); + + EXPECT_EQ (proc.ToJson (), ParseJson (R"({ + "confirmed": "new 2", + "names": {} + })")); +} + +/* ************************************************************************** */ + +} // anonymous namespace +} // namespace xaya From 59ce71b864f904a106f8e7af89423321c40f3f4e Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Tue, 6 Aug 2019 14:46:08 +0200 Subject: [PATCH 6/8] Support pending moves in Game. This adds support for tracking pending moves to the Game class. It will auto-detect the ZMQ endpoint for pending moves alongside of the one for block notifications. Also, one can (optionally) attach a PendingMoveProcessor instance, which will then be kept up-to-date with incoming moves and block attaches/detaches. --- xayagame/game.cpp | 73 +++++++++++++--- xayagame/game.hpp | 13 ++- xayagame/game_tests.cpp | 188 +++++++++++++++++++++++++++++++++++++++- xayagame/testutils.cpp | 6 ++ xayagame/testutils.hpp | 15 +++- 5 files changed, 278 insertions(+), 17 deletions(-) diff --git a/xayagame/game.cpp b/xayagame/game.cpp index 351f4d7c..2ba2c0ed 100644 --- a/xayagame/game.cpp +++ b/xayagame/game.cpp @@ -239,6 +239,9 @@ Game::BlockAttach (const std::string& id, const Json::Value& data, if (needReinit) ReinitialiseState (); + + if (state == State::UP_TO_DATE && pending != nullptr) + pending->ProcessAttachedBlock (storage->GetCurrentGameState ()); } void @@ -323,6 +326,9 @@ Game::BlockDetach (const std::string& id, const Json::Value& data, if (needReinit) ReinitialiseState (); + + if (state == State::UP_TO_DATE && pending != nullptr) + pending->ProcessDetachedBlock (storage->GetCurrentGameState (), data); } void @@ -336,9 +342,13 @@ Game::PendingMove (const std::string& id, const Json::Value& data) VLOG (1) << "Processing pending move " << txid.ToHex (); std::lock_guard lock(mut); - - /* FIXME: Actual implementation, forwarding the data to a PendingMoveProcessor - (if we have it). */ + if (state == State::UP_TO_DATE) + { + CHECK (pending != nullptr); + pending->ProcessMove (storage->GetCurrentGameState (), data); + } + else + VLOG (1) << "Ignoring pending move while not up-to-date: " << data; } void @@ -365,6 +375,8 @@ Game::ConnectRpcClient (jsonrpc::IClientConnector& conn) LOG (INFO) << "Connected to RPC daemon with chain " << ChainToString (chain); if (rules != nullptr) rules->InitialiseGameContext (chain, gameId, rpcClient.get ()); + if (pending != nullptr) + pending->InitialiseGameContext (chain, gameId, rpcClient.get ()); } unsigned @@ -442,6 +454,16 @@ Game::SetGameLogic (GameLogic& gl) rules->InitialiseGameContext (chain, gameId, rpcClient.get ()); } +void +Game::SetPendingMoveProcessor (PendingMoveProcessor& p) +{ + std::lock_guard lock(mut); + CHECK (!mainLoop.IsRunning ()); + pending = &p; + if (chain != Chain::UNKNOWN) + pending->InitialiseGameContext (chain, gameId, rpcClient.get ()); +} + void Game::EnablePruning (const unsigned nBlocks) { @@ -470,15 +492,36 @@ Game::DetectZmqEndpoint () } VLOG (1) << "Configured ZMQ notifications:\n" << notifications; + bool foundBlocks = false; for (const auto& val : notifications) - if (val.get ("type", "") == "pubgameblocks") - { - const std::string endpoint = val.get ("address", "").asString (); - CHECK (!endpoint.empty ()); - LOG (INFO) << "Detected ZMQ endpoint: " << endpoint; - zmq.SetEndpoint (endpoint); - return true; - } + { + const auto& typeVal = val["type"]; + if (!typeVal.isString ()) + continue; + + const auto& addrVal = val["address"]; + CHECK (addrVal.isString ()); + const std::string address = addrVal.asString (); + CHECK (!address.empty ()); + + const std::string type = typeVal.asString (); + if (type == "pubgameblocks") + { + LOG (INFO) << "Detected ZMQ blocks endpoint: " << address; + zmq.SetEndpoint (address); + foundBlocks = true; + continue; + } + if (type == "pubgamepending") + { + LOG (INFO) << "Detected ZMQ pending endpoint: " << address; + zmq.SetEndpointForPending (address); + continue; + } + } + + if (foundBlocks) + return true; LOG (WARNING) << "No -zmqpubgameblocks notifier seems to be set up"; return false; @@ -579,6 +622,14 @@ Game::UntrackGame () void Game::Start () { + if (pending == nullptr) + { + LOG (WARNING) + << "No PendingMoveProcessor has been set, disabling pending moves" + " in the ZMQ subscriber"; + zmq.SetEndpointForPending (""); + } + TrackGame (); zmq.Start (); diff --git a/xayagame/game.hpp b/xayagame/game.hpp index a01ceefa..7974f496 100644 --- a/xayagame/game.hpp +++ b/xayagame/game.hpp @@ -8,6 +8,7 @@ #include "gamelogic.hpp" #include "heightcache.hpp" #include "mainloop.hpp" +#include "pendingmoves.hpp" #include "pruningqueue.hpp" #include "storage.hpp" #include "transactionmanager.hpp" @@ -133,6 +134,9 @@ class Game : private internal::ZmqListener /** The game rules in use. */ GameLogic* rules = nullptr; + /** The processor for pending moves, if any. */ + PendingMoveProcessor* pending = nullptr; + /** * Desired size for batches of atomic transactions while the game is * catching up. <= 1 means no batching even in these situations. @@ -278,6 +282,13 @@ class Game : private internal::ZmqListener */ void SetGameLogic (GameLogic& gl); + /** + * Sets the processor for pending moves. Setting one is optional; if no + * processor is set of pending move notifications are not enabled in the + * connected Xaya Core, then no pending state will be available. + */ + void SetPendingMoveProcessor (PendingMoveProcessor& p); + /** * Enables (or changes) pruning with the given number of blocks to keep. * Must be called after the storage is set already. @@ -285,7 +296,7 @@ class Game : private internal::ZmqListener void EnablePruning (unsigned nBlocks); /** - * Detects the ZMQ endpoint by calling getzmqnotifications on the Xaya + * Detects the ZMQ endpoint(s) by calling getzmqnotifications on the Xaya * daemon. Returns false if pubgameblocks is not enabled. */ bool DetectZmqEndpoint (); diff --git a/xayagame/game_tests.cpp b/xayagame/game_tests.cpp index 43d76e53..6f504014 100644 --- a/xayagame/game_tests.cpp +++ b/xayagame/game_tests.cpp @@ -10,6 +10,7 @@ #include "rpc-stubs/xayarpcserverstub.h" +#include #include #include @@ -283,6 +284,56 @@ class TestGame : public GameLogic }; +/** + * Processor for pending moves of our test game. The JSON state returned + * for the pending moves is just a JSON object where names are mapped to + * the latest value in a move (i.e. what the value would be when all + * transactions are confirmed). + * + * In addition to the names, we also add the current state as string + * to the JSON object with key "state". + */ +class TestPendingMoves : public PendingMoveProcessor +{ + +private: + + /** The currently built up JSON object. */ + Json::Value data; + +protected: + + void + Clear () override + { + data = Json::Value (Json::objectValue); + data["state"] = GetConfirmedState (); + } + + void + AddPendingMove (const Json::Value& mv) override + { + data["state"] = GetConfirmedState (); + + const std::string nm = mv["name"].asString (); + const std::string val = mv["move"].asString (); + data[nm] = val; + } + +public: + + TestPendingMoves () + : data(Json::objectValue) + {} + + Json::Value + ToJson () const override + { + return data; + } + +}; + /* ************************************************************************** */ class GameTests : public GameTestWithBlockchain @@ -386,7 +437,7 @@ TEST_F (ChainDetectionTests, Reconnection) using DetectZmqEndpointTests = GameTests; -TEST_F (DetectZmqEndpointTests, Success) +TEST_F (DetectZmqEndpointTests, BlocksWithoutPending) { const Json::Value notifications = ParseJson (R"( [ @@ -404,6 +455,27 @@ TEST_F (DetectZmqEndpointTests, Success) g.ConnectRpcClient (httpClient); ASSERT_TRUE (g.DetectZmqEndpoint ()); EXPECT_EQ (GetZmqEndpoint (g), "address"); + EXPECT_EQ (GetZmqEndpointPending (g), ""); +} + +TEST_F (DetectZmqEndpointTests, BlocksAndPending) +{ + const Json::Value notifications = ParseJson (R"( + [ + {"type": "pubgameblocks", "address": "address blocks"}, + {"type": "pubgamepending", "address": "address pending"} + ] + )"); + + EXPECT_CALL (mockXayaServer, getzmqnotifications ()) + .WillOnce (Return (notifications)); + + Game g(GAME_ID); + mockXayaServer.SetBestBlock (0, BlockHash (0)); + g.ConnectRpcClient (httpClient); + ASSERT_TRUE (g.DetectZmqEndpoint ()); + EXPECT_EQ (GetZmqEndpoint (g), "address blocks"); + EXPECT_EQ (GetZmqEndpointPending (g), "address pending"); } TEST_F (DetectZmqEndpointTests, NotSet) @@ -496,7 +568,8 @@ class InitialStateTests : public GameTests /** * Converts a string in the game-state format to a series of moves as they - * would appear in the block notification. + * would appear in the block notification. The txid of the move is derived + * by hashing the name. */ static Json::Value Moves (const std::string& str) @@ -508,7 +581,9 @@ class InitialStateTests : public GameTests { Json::Value obj(Json::objectValue); obj["name"] = str.substr (i, 1); - obj["move"] = str.substr (i + 1, 1); + const std::string val = str.substr (i + 1, 1); + obj["move"] = val; + obj["txid"] = SHA256::Hash (val).ToHex (); moves.append (obj); } @@ -1113,6 +1188,113 @@ TEST_F (SyncingTests, MissedAttachWhileCatchingUp) /* ************************************************************************** */ +class PendingMoveUpdateTests : public SyncingTests +{ + +protected: + + TestPendingMoves proc; + + PendingMoveUpdateTests () + { + g.SetPendingMoveProcessor (proc); + } + + /** + * Sets up the mempool that should be returned by the mock server. + * The txid's are constructed by hashing the given strings. + */ + void + SetMempool (const std::vector& vals) + { + Json::Value mempool(Json::arrayValue); + for (const auto& v : vals) + mempool.append (SHA256::Hash (v).ToHex ()); + + EXPECT_CALL (mockXayaServer, getrawmempool ()) + .WillRepeatedly (Return (mempool)); + } + +}; + +TEST_F (PendingMoveUpdateTests, CatchingUp) +{ + EXPECT_CALL (mockXayaServer, game_sendupdates (GAME_GENESIS_HASH, GAME_ID)) + .WillOnce (Return (SendupdatesResponse (BlockHash (12), "reqtoken"))); + + mockXayaServer.SetBestBlock (12, BlockHash (12)); + ReinitialiseState (g); + EXPECT_EQ (GetState (g), State::CATCHING_UP); + ExpectGameState (TestGame::GenesisBlockHash (), ""); + + CallBlockAttach (g, "reqtoken", + TestGame::GenesisBlockHash (), BlockHash (11), 11, + Moves ("a0b1"), NO_SEQ_MISMATCH); + EXPECT_EQ (GetState (g), State::CATCHING_UP); + ExpectGameState (BlockHash (11), "a0b1"); + + CallBlockDetach (g, "reqtoken", + TestGame::GenesisBlockHash (), BlockHash (11), 11, + NO_SEQ_MISMATCH); + EXPECT_EQ (GetState (g), State::CATCHING_UP); + ExpectGameState (TestGame::GenesisBlockHash (), ""); + + CallPendingMove (g, Moves ("ax")[0]); + + /* No updates should have been processed at all. */ + EXPECT_EQ (proc.ToJson (), ParseJson ("{}")); +} + +TEST_F (PendingMoveUpdateTests, PendingMoves) +{ + const auto mv = Moves ("axbyaz"); + for (const auto& mv : Moves ("axbyaz")) + CallPendingMove (g, mv); + + EXPECT_EQ (proc.ToJson (), ParseJson (R"({ + "state": "", + "a": "z", + "b": "y" + })")); +} + +TEST_F (PendingMoveUpdateTests, BlockAttach) +{ + SetMempool ({"y"}); + + for (const auto& mv : Moves ("axby")) + CallPendingMove (g, mv); + + AttachBlock (g, BlockHash (11), Moves ("a0b1")); + EXPECT_EQ (GetState (g), State::UP_TO_DATE); + ExpectGameState (BlockHash (11), "a0b1"); + + EXPECT_EQ (proc.ToJson (), ParseJson (R"({ + "state": "a0b1", + "b": "y" + })")); +} + +TEST_F (PendingMoveUpdateTests, BlockDetach) +{ + SetMempool ({"x"}); + + AttachBlock (g, BlockHash (11), Moves ("ax")); + EXPECT_EQ (GetState (g), State::UP_TO_DATE); + ExpectGameState (BlockHash (11), "ax"); + + DetachBlock (g); + EXPECT_EQ (GetState (g), State::UP_TO_DATE); + ExpectGameState (TestGame::GenesisBlockHash (), ""); + + EXPECT_EQ (proc.ToJson (), ParseJson (R"({ + "state": "", + "a": "x" + })")); +} + +/* ************************************************************************** */ + class PruningTests : public SyncingTests { diff --git a/xayagame/testutils.cpp b/xayagame/testutils.cpp index 8ffbd1ae..561f751d 100644 --- a/xayagame/testutils.cpp +++ b/xayagame/testutils.cpp @@ -116,6 +116,12 @@ GameTestFixture::CallBlockDetach (Game& g, const std::string& reqToken, g.BlockDetach (gameId, data, seqMismatch); } +void +GameTestFixture::CallPendingMove (Game& g, const Json::Value& mv) const +{ + g.PendingMove (gameId, mv); +} + void GameTestWithBlockchain::SetStartingBlock (const uint256& hash) { diff --git a/xayagame/testutils.hpp b/xayagame/testutils.hpp index 3efb7675..054eba7a 100644 --- a/xayagame/testutils.hpp +++ b/xayagame/testutils.hpp @@ -201,6 +201,12 @@ class GameTestFixture : public testing::Test return g.zmq.addrBlocks; } + static std::string + GetZmqEndpointPending (const Game& g) + { + return g.zmq.addrPending; + } + static State GetState (const Game& g) { @@ -234,7 +240,7 @@ class GameTestFixture : public testing::Test } /** - * Calls BlockAttach on the given game instance. The function takes care + * Calls BlockAttach on the given Game instance. The function takes care * of setting up the blockData JSON object correctly based on the building * blocks given here. */ @@ -244,7 +250,7 @@ class GameTestFixture : public testing::Test const Json::Value& moves, const bool seqMismatch) const; /** - * Calls BlockDetach on the given game instance, setting up the blockData + * Calls BlockDetach on the given Game instance, setting up the blockData * object correctly. */ void CallBlockDetach (Game& g, const std::string& reqToken, @@ -252,6 +258,11 @@ class GameTestFixture : public testing::Test unsigned height, const Json::Value& moves, const bool seqMismatch) const; + /** + * Calls PendingMove on the given Game instance. + */ + void CallPendingMove (Game& g, const Json::Value& data) const; + }; /** From e1f073b15b970d10b64827c5ad79cbe1f0721b1f Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Tue, 6 Aug 2019 16:11:37 +0200 Subject: [PATCH 7/8] Support PendingMoveProcessor in default mains. Allow the default main functions to install a PendingMoveProcessor. To do so, the instance must be created and managed outside of the default main, and passed to it in the GameDaemonConfiguration struct. --- xayagame/defaultmain.cpp | 6 ++++++ xayagame/defaultmain.hpp | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/xayagame/defaultmain.cpp b/xayagame/defaultmain.cpp index 1490b35d..82adadc1 100644 --- a/xayagame/defaultmain.cpp +++ b/xayagame/defaultmain.cpp @@ -165,6 +165,9 @@ DefaultMain (const GameDaemonConfiguration& config, const std::string& gameId, game->SetGameLogic (rules); + if (config.PendingMoves != nullptr) + game->SetPendingMoveProcessor (*config.PendingMoves); + if (config.EnablePruning >= 0) game->EnablePruning (config.EnablePruning); @@ -233,6 +236,9 @@ SQLiteMain (const GameDaemonConfiguration& config, const std::string& gameId, game->SetGameLogic (rules); + if (config.PendingMoves != nullptr) + game->SetPendingMoveProcessor (*config.PendingMoves); + if (config.EnablePruning >= 0) game->EnablePruning (config.EnablePruning); diff --git a/xayagame/defaultmain.hpp b/xayagame/defaultmain.hpp index c2bd0d1b..df6102da 100644 --- a/xayagame/defaultmain.hpp +++ b/xayagame/defaultmain.hpp @@ -7,6 +7,7 @@ #include "game.hpp" #include "gamelogic.hpp" +#include "pendingmoves.hpp" #include "sqlitegame.hpp" #include "storage.hpp" @@ -205,6 +206,12 @@ struct GameDaemonConfiguration */ std::string DataDirectory; + /** + * If set to non-null, then this PendingMoveProcessor instance is associated + * to the Game. + */ + PendingMoveProcessor* PendingMoves = nullptr; + /** * Factory class for customed instances of certain optional classes * like the RPC server. If not set, default classes are used instead. From dfb85051ad9bcf91224eb21ddb1c09eba286b768 Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Tue, 6 Aug 2019 16:48:40 +0200 Subject: [PATCH 8/8] Expose pending state as JSON from Game. This adds the new method Game::GetPendingJsonState, which exposes the pending state (if it is tracked) as JSON from the Game instance. It is also exposed by the default GameRpcServer with the new method "getpendingstate". --- xayagame/game.cpp | 31 +++++++++++++ xayagame/game.hpp | 9 ++++ xayagame/game_tests.cpp | 85 ++++++++++++++++++++++++++++++++++++ xayagame/gamerpcserver.cpp | 9 +++- xayagame/gamerpcserver.hpp | 3 +- xayagame/rpc-stubs/game.json | 5 +++ xayagame/zmqsubscriber.hpp | 9 ++++ 7 files changed, 149 insertions(+), 2 deletions(-) diff --git a/xayagame/game.cpp b/xayagame/game.cpp index 2ba2c0ed..a9f87269 100644 --- a/xayagame/game.cpp +++ b/xayagame/game.cpp @@ -4,6 +4,9 @@ #include "game.hpp" +#include +#include + #include #include @@ -563,6 +566,34 @@ Game::GetCurrentJsonState () const }); } +Json::Value +Game::GetPendingJsonState () const +{ + std::unique_lock lock(mut); + + if (!zmq.IsPendingEnabled ()) + throw jsonrpc::JsonRpcException (jsonrpc::Errors::ERROR_RPC_INTERNAL_ERROR, + "pending moves are not tracked"); + CHECK (pending != nullptr); + + Json::Value res(Json::objectValue); + res["gameid"] = gameId; + res["chain"] = ChainToString (chain); + res["state"] = StateToString (state); + + uint256 hash; + unsigned height; + if (storage->GetCurrentBlockHashWithHeight (hash, height)) + { + res["blockhash"] = hash.ToHex (); + res["height"] = height; + } + + res["pending"] = pending->ToJson (); + + return res; +} + void Game::NotifyStateChange () const { diff --git a/xayagame/game.hpp b/xayagame/game.hpp index 7974f496..7cb4c8b8 100644 --- a/xayagame/game.hpp +++ b/xayagame/game.hpp @@ -339,6 +339,15 @@ class Game : private internal::ZmqListener */ Json::Value GetCurrentJsonState () const; + /** + * Returns a JSON object that contains data about the current state + * of pending moves as JSON. + * + * If no PendingMoveProcessor is attached or if pending moves are disabled + * in the Xaya Core notifications, then this raises a JSON-RPC error. + */ + Json::Value GetPendingJsonState () const; + /** * Blocks the calling thread until a change to the game state has * (potentially) been made. This can be used to implement long-polling diff --git a/xayagame/game_tests.cpp b/xayagame/game_tests.cpp index 6f504014..0bad0664 100644 --- a/xayagame/game_tests.cpp +++ b/xayagame/game_tests.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include @@ -745,6 +746,90 @@ TEST_F (GetCurrentJsonStateTests, HeightResolvedViaRpc) /* ************************************************************************** */ +class GetPendingJsonState : public InitialStateTests +{ + +protected: + + GetPendingJsonState () + { + EXPECT_CALL (mockXayaServer, trackedgames (_, _)).Times (AnyNumber ()); + } + + /** + * Sets up the ZMQ endpoints (by mocking getzmqnotifications in the RPC + * server and detecting them). This can either make the server expose + * a notification for pending moves or not. + */ + void + SetupZmqEndpoints (const bool withPending) + { + Json::Value notifications = ParseJson (R"( + [ + {"type": "pubgameblocks", "address": "tcp://127.0.0.1:32101"} + ] + )"); + + if (withPending) + notifications.append (ParseJson (R"( + {"type": "pubgamepending", "address": "tcp://127.0.0.1:32102"} + )")); + + EXPECT_CALL (mockXayaServer, getzmqnotifications ()) + .WillOnce (Return (notifications)); + CHECK (g.DetectZmqEndpoint ()); + } + +}; + +TEST_F (GetPendingJsonState, NoAttachedProcessor) +{ + SetupZmqEndpoints (true); + g.Start (); + + EXPECT_THROW (g.GetPendingJsonState (), jsonrpc::JsonRpcException); +} + +TEST_F (GetPendingJsonState, PendingNotificationDisabled) +{ + TestPendingMoves proc; + g.SetPendingMoveProcessor (proc); + + SetupZmqEndpoints (false); + g.Start (); + + EXPECT_THROW (g.GetPendingJsonState (), jsonrpc::JsonRpcException); +} + +TEST_F (GetPendingJsonState, PendingState) +{ + TestPendingMoves proc; + g.SetPendingMoveProcessor (proc); + + SetupZmqEndpoints (true); + g.Start (); + + mockXayaServer.SetBestBlock (GAME_GENESIS_HEIGHT, + TestGame::GenesisBlockHash ()); + ReinitialiseState (g); + + CallPendingMove (g, Moves ("ax")[0]); + + const auto state = g.GetPendingJsonState (); + EXPECT_EQ (state["gameid"], GAME_ID); + EXPECT_EQ (state["chain"], "main"); + EXPECT_EQ (state["state"], "up-to-date"); + EXPECT_EQ (state["blockhash"], GAME_GENESIS_HASH); + EXPECT_EQ (state["pending"], ParseJson (R"( + { + "state": "", + "a": "x" + } + )")); +} + +/* ************************************************************************** */ + class WaitForChangeTests : public InitialStateTests { diff --git a/xayagame/gamerpcserver.cpp b/xayagame/gamerpcserver.cpp index 9a01f7fc..a6eff8aa 100644 --- a/xayagame/gamerpcserver.cpp +++ b/xayagame/gamerpcserver.cpp @@ -1,4 +1,4 @@ -// Copyright (C) 2018 The Xaya developers +// Copyright (C) 2018-2019 The Xaya developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -23,6 +23,13 @@ GameRpcServer::getcurrentstate () return game.GetCurrentJsonState (); } +Json::Value +GameRpcServer::getpendingstate () +{ + LOG (INFO) << "RPC method called: getpendingstate"; + return game.GetPendingJsonState (); +} + std::string GameRpcServer::waitforchange (const std::string& knownBlock) { diff --git a/xayagame/gamerpcserver.hpp b/xayagame/gamerpcserver.hpp index c84adc13..b4838a9e 100644 --- a/xayagame/gamerpcserver.hpp +++ b/xayagame/gamerpcserver.hpp @@ -1,4 +1,4 @@ -// Copyright (C) 2018 The Xaya developers +// Copyright (C) 2018-2019 The Xaya developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -41,6 +41,7 @@ class GameRpcServer : public GameRpcServerStub virtual void stop () override; virtual Json::Value getcurrentstate () override; + virtual Json::Value getpendingstate () override; virtual std::string waitforchange (const std::string& knownBlock) override; /** diff --git a/xayagame/rpc-stubs/game.json b/xayagame/rpc-stubs/game.json index ad00cbeb..1557adfc 100644 --- a/xayagame/rpc-stubs/game.json +++ b/xayagame/rpc-stubs/game.json @@ -8,6 +8,11 @@ "params": {}, "returns": {} }, + { + "name": "getpendingstate", + "params": {}, + "returns": {} + }, { "name": "waitforchange", "params": [ "known block" ], diff --git a/xayagame/zmqsubscriber.hpp b/xayagame/zmqsubscriber.hpp index 0c2b4a99..6aaaae1a 100644 --- a/xayagame/zmqsubscriber.hpp +++ b/xayagame/zmqsubscriber.hpp @@ -157,6 +157,15 @@ class ZmqSubscriber return worker != nullptr; } + /** + * Returns true if notifications for pending moves are enabled. + */ + bool + IsPendingEnabled () const + { + return !addrPending.empty (); + } + /** * Starts the ZMQ subscriber in a new thread. Must only be called after * the ZMQ endpoint has been configured, and must not be called when