From 174a315035f4a97c708be65166666847918de769 Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Wed, 7 Aug 2019 17:31:16 +0200 Subject: [PATCH 1/2] Implement WaitForPendingChange in Game. This adds another "waitforchange" blocking call, waiting for changes in the pending state. It is exposed via Game::WaitForPendingChange and the waitforpendingchange RPC method in the default RPC server. --- xayagame/game.cpp | 59 +++++++++- xayagame/game.hpp | 42 +++++++ xayagame/game_tests.cpp | 216 ++++++++++++++++++++++++++++++++++- xayagame/gamerpcserver.cpp | 9 +- xayagame/gamerpcserver.hpp | 1 + xayagame/rpc-stubs/game.json | 7 +- 6 files changed, 325 insertions(+), 9 deletions(-) diff --git a/xayagame/game.cpp b/xayagame/game.cpp index a9f87269..663cb863 100644 --- a/xayagame/game.cpp +++ b/xayagame/game.cpp @@ -244,7 +244,10 @@ Game::BlockAttach (const std::string& id, const Json::Value& data, ReinitialiseState (); if (state == State::UP_TO_DATE && pending != nullptr) - pending->ProcessAttachedBlock (storage->GetCurrentGameState ()); + { + pending->ProcessAttachedBlock (storage->GetCurrentGameState ()); + NotifyPendingStateChange (); + } } void @@ -331,7 +334,10 @@ Game::BlockDetach (const std::string& id, const Json::Value& data, ReinitialiseState (); if (state == State::UP_TO_DATE && pending != nullptr) - pending->ProcessDetachedBlock (storage->GetCurrentGameState (), data); + { + pending->ProcessDetachedBlock (storage->GetCurrentGameState (), data); + NotifyPendingStateChange (); + } } void @@ -349,6 +355,7 @@ Game::PendingMove (const std::string& id, const Json::Value& data) { CHECK (pending != nullptr); pending->ProcessMove (storage->GetCurrentGameState (), data); + NotifyPendingStateChange (); } else VLOG (1) << "Ignoring pending move while not up-to-date: " << data; @@ -570,13 +577,19 @@ Json::Value Game::GetPendingJsonState () const { std::unique_lock lock(mut); + return UnlockedPendingJsonState (); +} +Json::Value +Game::UnlockedPendingJsonState () const +{ if (!zmq.IsPendingEnabled ()) throw jsonrpc::JsonRpcException (jsonrpc::Errors::ERROR_RPC_INTERNAL_ERROR, "pending moves are not tracked"); CHECK (pending != nullptr); Json::Value res(Json::objectValue); + res["version"] = pendingStateVersion; res["gameid"] = gameId; res["chain"] = ChainToString (chain); res["state"] = StateToString (state); @@ -603,6 +616,19 @@ Game::NotifyStateChange () const cvStateChanged.notify_all (); } +void +Game::NotifyPendingStateChange () +{ + /* Callers are expected to already hold the mut lock here (as that is the + typical case when they make changes to the state anyway). */ + CHECK_GT (pendingStateVersion, WAITFORCHANGE_ALWAYS_BLOCK); + ++pendingStateVersion; + VLOG (1) + << "Notifying waiting threads about change of pending state," + << " new version: " << pendingStateVersion; + cvPendingStateChanged.notify_all (); +} + void Game::WaitForChange (const uint256& oldBlock, uint256& newBlock) const { @@ -632,6 +658,34 @@ Game::WaitForChange (const uint256& oldBlock, uint256& newBlock) const newBlock.SetNull (); } +Json::Value +Game::WaitForPendingChange (const int oldVersion) const +{ + std::unique_lock lock(mut); + + if (oldVersion != WAITFORCHANGE_ALWAYS_BLOCK + && oldVersion != pendingStateVersion) + { + VLOG (1) + << "Known version differs from current one," + " returning immediately from WaitForPendingState"; + return UnlockedPendingJsonState (); + } + + if (zmq.IsRunning () && zmq.IsPendingEnabled ()) + { + VLOG (1) << "Waiting for pending state change on condition variable..."; + cvPendingStateChanged.wait_for (lock, WAITFORCHANGE_TIMEOUT); + VLOG (1) << "Potential state change detected in WaitForPendingChange"; + } + else + LOG (WARNING) + << "WaitForPendingChange called with no ZMQ listener on pending moves," + " returning immediately"; + + return UnlockedPendingJsonState (); +} + void Game::TrackGame () { @@ -677,6 +731,7 @@ Game::Stop () /* Make sure to wake up all listeners waiting for a state update (as there won't be one anymore). */ NotifyStateChange (); + NotifyPendingStateChange (); } void diff --git a/xayagame/game.hpp b/xayagame/game.hpp index 7cb4c8b8..f881053d 100644 --- a/xayagame/game.hpp +++ b/xayagame/game.hpp @@ -100,6 +100,12 @@ class Game : private internal::ZmqListener */ mutable std::condition_variable cvStateChanged; + /** + * Condition variable that is signalled whenever the pending state + * is changed. + */ + mutable std::condition_variable cvPendingStateChanged; + /** The chain type to which the game is connected. */ Chain chain = Chain::UNKNOWN; @@ -137,6 +143,13 @@ class Game : private internal::ZmqListener /** The processor for pending moves, if any. */ PendingMoveProcessor* pending = nullptr; + /** + * Version number of the "current" pending state. This number is incremented + * whenever the pending state may have changed, and is used to identify + * known states with WaitForPendingChange. + */ + int pendingStateVersion = 1; + /** * Desired size for batches of atomic transactions while the game is * catching up. <= 1 means no batching even in these situations. @@ -227,6 +240,17 @@ class Game : private internal::ZmqListener */ void NotifyStateChange () const; + /** + * Notifies potentially-waiting threads that the pending state has changed. + */ + void NotifyPendingStateChange (); + + /** + * Returns the current pending state as JSON, but assuming that the caller + * already holds the mut lock. + */ + Json::Value UnlockedPendingJsonState () const; + /** * Converts a state enum value to a string for use in log messages and the * JSON-RPC interface. @@ -237,6 +261,12 @@ class Game : private internal::ZmqListener public: + /** + * Special value for the old version in WaitForPendingChange that tells the + * function to always block. + */ + static constexpr int WAITFORCHANGE_ALWAYS_BLOCK = 0; + explicit Game (const std::string& id); Game () = delete; @@ -374,6 +404,18 @@ class Game : private internal::ZmqListener */ void WaitForChange (const uint256& oldBlock, uint256& newBlock) const; + /** + * Blocks the calling thread until a change to the pending state has + * been made. Note that this function may return spuriously. + * Returns the new pending state as per GetPendingJsonState. + * + * If oldVersion is passed and not WAITFORCHANGE_ALWAYS_BLOCK, then the + * method returns immediately if the version of the current pending state + * (as returned in the JSON field "version") does not match the one + * passed in. + */ + Json::Value WaitForPendingChange (int oldState) const; + /** * Starts the ZMQ subscriber and other logic. Must not be called before * the ZMQ endpoint has been configured, and must not be called when diff --git a/xayagame/game_tests.cpp b/xayagame/game_tests.cpp index 0bad0664..4ea557c8 100644 --- a/xayagame/game_tests.cpp +++ b/xayagame/game_tests.cpp @@ -23,6 +23,7 @@ #include +#include #include #include #include @@ -746,12 +747,12 @@ TEST_F (GetCurrentJsonStateTests, HeightResolvedViaRpc) /* ************************************************************************** */ -class GetPendingJsonState : public InitialStateTests +class GetPendingJsonStateTests : public InitialStateTests { protected: - GetPendingJsonState () + GetPendingJsonStateTests () { EXPECT_CALL (mockXayaServer, trackedgames (_, _)).Times (AnyNumber ()); } @@ -782,7 +783,7 @@ class GetPendingJsonState : public InitialStateTests }; -TEST_F (GetPendingJsonState, NoAttachedProcessor) +TEST_F (GetPendingJsonStateTests, NoAttachedProcessor) { SetupZmqEndpoints (true); g.Start (); @@ -790,7 +791,7 @@ TEST_F (GetPendingJsonState, NoAttachedProcessor) EXPECT_THROW (g.GetPendingJsonState (), jsonrpc::JsonRpcException); } -TEST_F (GetPendingJsonState, PendingNotificationDisabled) +TEST_F (GetPendingJsonStateTests, PendingNotificationDisabled) { TestPendingMoves proc; g.SetPendingMoveProcessor (proc); @@ -801,7 +802,7 @@ TEST_F (GetPendingJsonState, PendingNotificationDisabled) EXPECT_THROW (g.GetPendingJsonState (), jsonrpc::JsonRpcException); } -TEST_F (GetPendingJsonState, PendingState) +TEST_F (GetPendingJsonStateTests, PendingState) { TestPendingMoves proc; g.SetPendingMoveProcessor (proc); @@ -1014,6 +1015,211 @@ TEST_F (WaitForChangeTests, OutdatedOldBlock) /* ************************************************************************** */ +class WaitForPendingChangeTests : public GetPendingJsonStateTests +{ + +private: + + /** The thread that is used to call WaitForPendingChange. */ + std::unique_ptr waiter; + + TestPendingMoves proc; + +protected: + + /** Flag that tells us when the waiter thread is done. */ + std::atomic waiterDone; + + WaitForPendingChangeTests () + { + EXPECT_CALL (mockXayaServer, getrawmempool ()) + .WillRepeatedly (Return (Json::Value (Json::arrayValue))); + + g.SetPendingMoveProcessor (proc); + SetStartingBlock (TestGame::GenesisBlockHash ()); + + mockXayaServer.SetBestBlock (10, TestGame::GenesisBlockHash ()); + ReinitialiseState (g); + EXPECT_EQ (GetState (g), State::UP_TO_DATE); + } + + /** + * Calls WaitForPendingChange on a newly started thread, storing the output + * to the given variable. If the method throws a JsonRpcException, then + * the output is set to null. + */ + void + CallWaitForPendingChange (const int oldVersion, Json::Value& output) + { + ASSERT_EQ (waiter, nullptr); + waiterDone = false; + waiter = std::make_unique ([this, oldVersion, &output] () + { + try + { + output = g.WaitForPendingChange (oldVersion); + } + catch (const jsonrpc::JsonRpcException& exc) + { + LOG (INFO) + << "Caught exception from WaitForPendingChange: " + << exc.what (); + output = Json::Value (); + } + waiterDone = true; + }); + } + + /** + * Verifies that a waiter has been started and received the notification + * of a new state already, or does so soon. + */ + void + JoinWaiter () + { + ASSERT_NE (waiter, nullptr); + if (!waiterDone) + SleepSome (); + EXPECT_TRUE (waiterDone); + waiter->join (); + waiter.reset (); + } + +}; + +TEST_F (WaitForPendingChangeTests, ZmqNotRunning) +{ + SetupZmqEndpoints (true); + + Json::Value out; + CallWaitForPendingChange (Game::WAITFORCHANGE_ALWAYS_BLOCK, out); + JoinWaiter (); + EXPECT_TRUE (out.isObject ()); +} + +TEST_F (WaitForPendingChangeTests, NotTrackingPendingMoves) +{ + SetupZmqEndpoints (false); + g.Start (); + + Json::Value out; + CallWaitForPendingChange (Game::WAITFORCHANGE_ALWAYS_BLOCK, out); + JoinWaiter (); + + /* When pending moves are not tracked, then WaitForPendingChange should + throw a JSON-RPC error just like GetPendingJsonState. */ + EXPECT_TRUE (out.isNull ()); +} + +TEST_F (WaitForPendingChangeTests, StopWakesUpWaiters) +{ + SetupZmqEndpoints (true); + g.Start (); + + Json::Value out; + CallWaitForPendingChange (Game::WAITFORCHANGE_ALWAYS_BLOCK, out); + SleepSome (); + EXPECT_FALSE (waiterDone); + g.Stop (); + JoinWaiter (); + EXPECT_TRUE (out.isObject ()); +} + +TEST_F (WaitForPendingChangeTests, OldVersionImmediateReturn) +{ + SetupZmqEndpoints (true); + g.Start (); + + const auto state = g.GetPendingJsonState (); + + Json::Value out; + CallWaitForPendingChange (42, out); + JoinWaiter (); + EXPECT_EQ (out, state); +} + +TEST_F (WaitForPendingChangeTests, OldVersionWaiting) +{ + SetupZmqEndpoints (true); + g.Start (); + + const int oldVersion = g.GetPendingJsonState ()["version"].asInt (); + + Json::Value out; + CallWaitForPendingChange (oldVersion, out); + + SleepSome (); + EXPECT_FALSE (waiterDone); + + CallPendingMove (g, Moves ("ax")[0]); + JoinWaiter (); +} + +TEST_F (WaitForPendingChangeTests, PendingMove) +{ + SetupZmqEndpoints (true); + g.Start (); + + Json::Value out; + CallWaitForPendingChange (Game::WAITFORCHANGE_ALWAYS_BLOCK, out); + + SleepSome (); + EXPECT_FALSE (waiterDone); + + CallPendingMove (g, Moves ("ax")[0]); + JoinWaiter (); + EXPECT_EQ (out["pending"], ParseJson (R"( + { + "state": "", + "a": "x" + } + )")); +} + +TEST_F (WaitForPendingChangeTests, AttachedBlock) +{ + SetupZmqEndpoints (true); + g.Start (); + + Json::Value out; + CallWaitForPendingChange (Game::WAITFORCHANGE_ALWAYS_BLOCK, out); + + SleepSome (); + EXPECT_FALSE (waiterDone); + + AttachBlock (g, BlockHash (11), Moves ("a0b1")); + JoinWaiter (); + EXPECT_EQ (out["pending"], ParseJson (R"( + { + "state": "a0b1" + } + )")); +} + +TEST_F (WaitForPendingChangeTests, DetachedBlock) +{ + SetupZmqEndpoints (true); + g.Start (); + + AttachBlock (g, BlockHash (11), Moves ("a0b1")); + + Json::Value out; + CallWaitForPendingChange (Game::WAITFORCHANGE_ALWAYS_BLOCK, out); + + SleepSome (); + EXPECT_FALSE (waiterDone); + + DetachBlock (g); + JoinWaiter (); + EXPECT_EQ (out["pending"], ParseJson (R"( + { + "state": "" + } + )")); +} + +/* ************************************************************************** */ + class SyncingTests : public InitialStateTests { diff --git a/xayagame/gamerpcserver.cpp b/xayagame/gamerpcserver.cpp index a6eff8aa..94757450 100644 --- a/xayagame/gamerpcserver.cpp +++ b/xayagame/gamerpcserver.cpp @@ -37,6 +37,13 @@ GameRpcServer::waitforchange (const std::string& knownBlock) return DefaultWaitForChange (game, knownBlock); } +Json::Value +GameRpcServer::waitforpendingchange (const int oldVersion) +{ + LOG (INFO) << "RPC method called: waitforpendingchange " << oldVersion; + return game.WaitForPendingChange (oldVersion); +} + std::string GameRpcServer::DefaultWaitForChange (const Game& g, const std::string& knownBlock) @@ -50,7 +57,7 @@ GameRpcServer::DefaultWaitForChange (const Game& g, uint256 newBlock; g.WaitForChange (oldBlock, newBlock); - /* If there is no best block so far, return JSON null. */ + /* If there is no best block so far, return empty string. */ if (newBlock.IsNull ()) return ""; diff --git a/xayagame/gamerpcserver.hpp b/xayagame/gamerpcserver.hpp index b4838a9e..aac61855 100644 --- a/xayagame/gamerpcserver.hpp +++ b/xayagame/gamerpcserver.hpp @@ -43,6 +43,7 @@ class GameRpcServer : public GameRpcServerStub virtual Json::Value getcurrentstate () override; virtual Json::Value getpendingstate () override; virtual std::string waitforchange (const std::string& knownBlock) override; + virtual Json::Value waitforpendingchange (int oldVersion) override; /** * Implements the standard waitforchange RPC method independent of a diff --git a/xayagame/rpc-stubs/game.json b/xayagame/rpc-stubs/game.json index 1557adfc..c1fdc681 100644 --- a/xayagame/rpc-stubs/game.json +++ b/xayagame/rpc-stubs/game.json @@ -15,7 +15,12 @@ }, { "name": "waitforchange", - "params": [ "known block" ], + "params": ["known block"], "returns": "" + }, + { + "name": "waitforpendingchange", + "params": [42], + "returns": {} } ] From 564b881bec57b72634159b90e44221e55ccadae7 Mon Sep 17 00:00:00 2001 From: Daniel Kraft Date: Thu, 8 Aug 2019 09:34:03 +0200 Subject: [PATCH 2/2] Integration test for waitforpendingchange. Add tests for waitforpendingchange to the waitforchange.py Mover integration test. --- mover/gametest/waitforchange.py | 90 +++++++++++++++++++++++++-------- 1 file changed, 69 insertions(+), 21 deletions(-) diff --git a/mover/gametest/waitforchange.py b/mover/gametest/waitforchange.py index 71d64f14..9894ff30 100755 --- a/mover/gametest/waitforchange.py +++ b/mover/gametest/waitforchange.py @@ -1,12 +1,12 @@ #!/usr/bin/env python -# Copyright (C) 2018 The Xaya developers +# Copyright (C) 2018-2019 The Xaya developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. from mover import MoverTest """ -Tests the waitforchange RPC method behaviour. +Tests the waitforchange and waitforpendingchange RPC methods. """ import threading @@ -24,41 +24,44 @@ def sleepSome (): class ForChangeWaiter (threading.Thread): """ - Thread subclass that calls the waitforchange RPC method and just blocks - until it receives the result. + Thread subclass that calls the waitfor(pending)change RPC method and just + blocks until it receives the result. """ - def __init__ (self, node): + def __init__ (self, node, method, getOldVersion): super (ForChangeWaiter, self).__init__ () self.node = node + self.method = method + self.getOldVersion = getOldVersion self.result = None self.start () sleepSome () def run (self): rpc = self.node.createRpc () - state = rpc.getcurrentstate () - oldBlock = "" - if "blockhash" in state: - oldBlock = state["blockhash"] - self.result = rpc.waitforchange (oldBlock) + fcn = getattr (rpc, self.method) + self.result = fcn (self.getOldVersion (rpc)) def shouldBeRunning (self): sleepSome () assert self.is_alive () - def shouldBeDone (self, expected): + def shouldBeDone (self, expected=None): sleepSome () assert not self.is_alive () self.join () - assert self.result == expected + if expected is not None: + assert self.result == expected class WaitForChangeTest (MoverTest): def run (self): + self.generate (101) + self.test_attach () self.test_detach () + self.test_move () self.test_stopped () # Since the initial game state on regtest is associated with the genesis @@ -66,14 +69,44 @@ def run (self): # signaled because of the initial state or where there is not yet a current # best block and null is returned from the RPC. + def getBlockChangeWaiter (self): + """ + Returns a ForChangeWaiter instance calling waitforchange. + """ + + def getOldVersion (rpc): + state = rpc.getcurrentstate () + if "blockhash" in state: + return state["blockhash"] + return "" + + return ForChangeWaiter (self.gamenode, "waitforchange", getOldVersion) + + def getPendingChangeWaiter (self): + """ + Returns a ForChangeWaiter instance calling waitforpendingchange. + """ + + def getOldVersion (rpc): + state = rpc.getpendingstate () + if "version" in state: + return state["version"] + return 0 + + return ForChangeWaiter (self.gamenode, "waitforpendingchange", + getOldVersion) + def test_attach (self): self.mainLogger.info ("Block attaches...") - waiter = ForChangeWaiter (self.gamenode) - waiter.shouldBeRunning () + blocks = self.getBlockChangeWaiter () + pending = self.getPendingChangeWaiter () + blocks.shouldBeRunning () + pending.shouldBeRunning () self.generate (1) - waiter.shouldBeDone (self.rpc.xaya.getbestblockhash ()) + blocks.shouldBeDone (self.rpc.xaya.getbestblockhash ()) + pending.shouldBeDone (self.rpc.game.getpendingstate ()) def test_detach (self): self.mainLogger.info ("Block detaches...") @@ -81,20 +114,35 @@ def test_detach (self): self.generate (1) blk = self.rpc.xaya.getbestblockhash () - waiter = ForChangeWaiter (self.gamenode) - waiter.shouldBeRunning () + blocks = self.getBlockChangeWaiter () + pending = self.getPendingChangeWaiter () + blocks.shouldBeRunning () + pending.shouldBeRunning () self.rpc.xaya.invalidateblock (blk) - waiter.shouldBeDone (self.rpc.xaya.getbestblockhash ()) + blocks.shouldBeDone (self.rpc.xaya.getbestblockhash ()) + pending.shouldBeDone (self.rpc.game.getpendingstate ()) + + def test_move (self): + self.mainLogger.info ("Move sent...") + + pending = self.getPendingChangeWaiter () + pending.shouldBeRunning () + + self.move ("a", "k", 5) + pending.shouldBeDone (self.rpc.game.getpendingstate ()) def test_stopped (self): self.mainLogger.info ("Stopping the daemon while a waiter is active...") - waiter = ForChangeWaiter (self.gamenode) - waiter.shouldBeRunning () + blocks = self.getBlockChangeWaiter () + pending = self.getPendingChangeWaiter () + blocks.shouldBeRunning () + pending.shouldBeRunning () self.stopGameDaemon () - waiter.shouldBeDone (self.rpc.xaya.getbestblockhash ()) + blocks.shouldBeDone (self.rpc.xaya.getbestblockhash ()) + pending.shouldBeDone () self.startGameDaemon ()