Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement "WaitForChange" for pending moves #81

Merged
merged 2 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 69 additions & 21 deletions mover/gametest/waitforchange.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/usr/bin/env python
# Copyright (C) 2018 The Xaya developers
# Copyright (C) 2018-2019 The Xaya developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

from mover import MoverTest

"""
Tests the waitforchange RPC method behaviour.
Tests the waitforchange and waitforpendingchange RPC methods.
"""

import threading
Expand All @@ -24,77 +24,125 @@ def sleepSome ():

class ForChangeWaiter (threading.Thread):
"""
Thread subclass that calls the waitforchange RPC method and just blocks
until it receives the result.
Thread subclass that calls the waitfor(pending)change RPC method and just
blocks until it receives the result.
"""

def __init__ (self, node):
def __init__ (self, node, method, getOldVersion):
super (ForChangeWaiter, self).__init__ ()
self.node = node
self.method = method
self.getOldVersion = getOldVersion
self.result = None
self.start ()
sleepSome ()

def run (self):
rpc = self.node.createRpc ()
state = rpc.getcurrentstate ()
oldBlock = ""
if "blockhash" in state:
oldBlock = state["blockhash"]
self.result = rpc.waitforchange (oldBlock)
fcn = getattr (rpc, self.method)
self.result = fcn (self.getOldVersion (rpc))

def shouldBeRunning (self):
sleepSome ()
assert self.is_alive ()

def shouldBeDone (self, expected):
def shouldBeDone (self, expected=None):
sleepSome ()
assert not self.is_alive ()
self.join ()
assert self.result == expected
if expected is not None:
assert self.result == expected


class WaitForChangeTest (MoverTest):

def run (self):
self.generate (101)

self.test_attach ()
self.test_detach ()
self.test_move ()
self.test_stopped ()

# Since the initial game state on regtest is associated with the genesis
# block already, we cannot test situations where either waitforchange is
# signaled because of the initial state or where there is not yet a current
# best block and null is returned from the RPC.

def getBlockChangeWaiter (self):
"""
Returns a ForChangeWaiter instance calling waitforchange.
"""

def getOldVersion (rpc):
state = rpc.getcurrentstate ()
if "blockhash" in state:
return state["blockhash"]
return ""

return ForChangeWaiter (self.gamenode, "waitforchange", getOldVersion)

def getPendingChangeWaiter (self):
"""
Returns a ForChangeWaiter instance calling waitforpendingchange.
"""

def getOldVersion (rpc):
state = rpc.getpendingstate ()
if "version" in state:
return state["version"]
return 0

return ForChangeWaiter (self.gamenode, "waitforpendingchange",
getOldVersion)

def test_attach (self):
self.mainLogger.info ("Block attaches...")

waiter = ForChangeWaiter (self.gamenode)
waiter.shouldBeRunning ()
blocks = self.getBlockChangeWaiter ()
pending = self.getPendingChangeWaiter ()
blocks.shouldBeRunning ()
pending.shouldBeRunning ()

self.generate (1)
waiter.shouldBeDone (self.rpc.xaya.getbestblockhash ())
blocks.shouldBeDone (self.rpc.xaya.getbestblockhash ())
pending.shouldBeDone (self.rpc.game.getpendingstate ())

def test_detach (self):
self.mainLogger.info ("Block detaches...")

self.generate (1)
blk = self.rpc.xaya.getbestblockhash ()

waiter = ForChangeWaiter (self.gamenode)
waiter.shouldBeRunning ()
blocks = self.getBlockChangeWaiter ()
pending = self.getPendingChangeWaiter ()
blocks.shouldBeRunning ()
pending.shouldBeRunning ()

self.rpc.xaya.invalidateblock (blk)
waiter.shouldBeDone (self.rpc.xaya.getbestblockhash ())
blocks.shouldBeDone (self.rpc.xaya.getbestblockhash ())
pending.shouldBeDone (self.rpc.game.getpendingstate ())

def test_move (self):
self.mainLogger.info ("Move sent...")

pending = self.getPendingChangeWaiter ()
pending.shouldBeRunning ()

self.move ("a", "k", 5)
pending.shouldBeDone (self.rpc.game.getpendingstate ())

def test_stopped (self):
self.mainLogger.info ("Stopping the daemon while a waiter is active...")

waiter = ForChangeWaiter (self.gamenode)
waiter.shouldBeRunning ()
blocks = self.getBlockChangeWaiter ()
pending = self.getPendingChangeWaiter ()
blocks.shouldBeRunning ()
pending.shouldBeRunning ()

self.stopGameDaemon ()
waiter.shouldBeDone (self.rpc.xaya.getbestblockhash ())
blocks.shouldBeDone (self.rpc.xaya.getbestblockhash ())
pending.shouldBeDone ()
self.startGameDaemon ()


Expand Down
59 changes: 57 additions & 2 deletions xayagame/game.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ Game::BlockAttach (const std::string& id, const Json::Value& data,
ReinitialiseState ();

if (state == State::UP_TO_DATE && pending != nullptr)
pending->ProcessAttachedBlock (storage->GetCurrentGameState ());
{
pending->ProcessAttachedBlock (storage->GetCurrentGameState ());
NotifyPendingStateChange ();
}
}

void
Expand Down Expand Up @@ -331,7 +334,10 @@ Game::BlockDetach (const std::string& id, const Json::Value& data,
ReinitialiseState ();

if (state == State::UP_TO_DATE && pending != nullptr)
pending->ProcessDetachedBlock (storage->GetCurrentGameState (), data);
{
pending->ProcessDetachedBlock (storage->GetCurrentGameState (), data);
NotifyPendingStateChange ();
}
}

void
Expand All @@ -349,6 +355,7 @@ Game::PendingMove (const std::string& id, const Json::Value& data)
{
CHECK (pending != nullptr);
pending->ProcessMove (storage->GetCurrentGameState (), data);
NotifyPendingStateChange ();
}
else
VLOG (1) << "Ignoring pending move while not up-to-date: " << data;
Expand Down Expand Up @@ -570,13 +577,19 @@ Json::Value
Game::GetPendingJsonState () const
{
std::unique_lock<std::mutex> lock(mut);
return UnlockedPendingJsonState ();
}

Json::Value
Game::UnlockedPendingJsonState () const
{
if (!zmq.IsPendingEnabled ())
throw jsonrpc::JsonRpcException (jsonrpc::Errors::ERROR_RPC_INTERNAL_ERROR,
"pending moves are not tracked");
CHECK (pending != nullptr);

Json::Value res(Json::objectValue);
res["version"] = pendingStateVersion;
res["gameid"] = gameId;
res["chain"] = ChainToString (chain);
res["state"] = StateToString (state);
Expand All @@ -603,6 +616,19 @@ Game::NotifyStateChange () const
cvStateChanged.notify_all ();
}

void
Game::NotifyPendingStateChange ()
{
/* Callers are expected to already hold the mut lock here (as that is the
typical case when they make changes to the state anyway). */
CHECK_GT (pendingStateVersion, WAITFORCHANGE_ALWAYS_BLOCK);
++pendingStateVersion;
VLOG (1)
<< "Notifying waiting threads about change of pending state,"
<< " new version: " << pendingStateVersion;
cvPendingStateChanged.notify_all ();
}

void
Game::WaitForChange (const uint256& oldBlock, uint256& newBlock) const
{
Expand Down Expand Up @@ -632,6 +658,34 @@ Game::WaitForChange (const uint256& oldBlock, uint256& newBlock) const
newBlock.SetNull ();
}

Json::Value
Game::WaitForPendingChange (const int oldVersion) const
{
std::unique_lock<std::mutex> lock(mut);

if (oldVersion != WAITFORCHANGE_ALWAYS_BLOCK
&& oldVersion != pendingStateVersion)
{
VLOG (1)
<< "Known version differs from current one,"
" returning immediately from WaitForPendingState";
return UnlockedPendingJsonState ();
}

if (zmq.IsRunning () && zmq.IsPendingEnabled ())
{
VLOG (1) << "Waiting for pending state change on condition variable...";
cvPendingStateChanged.wait_for (lock, WAITFORCHANGE_TIMEOUT);
VLOG (1) << "Potential state change detected in WaitForPendingChange";
}
else
LOG (WARNING)
<< "WaitForPendingChange called with no ZMQ listener on pending moves,"
" returning immediately";

return UnlockedPendingJsonState ();
}

void
Game::TrackGame ()
{
Expand Down Expand Up @@ -677,6 +731,7 @@ Game::Stop ()
/* Make sure to wake up all listeners waiting for a state update (as there
won't be one anymore). */
NotifyStateChange ();
NotifyPendingStateChange ();
}

void
Expand Down
42 changes: 42 additions & 0 deletions xayagame/game.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ class Game : private internal::ZmqListener
*/
mutable std::condition_variable cvStateChanged;

/**
* Condition variable that is signalled whenever the pending state
* is changed.
*/
mutable std::condition_variable cvPendingStateChanged;

/** The chain type to which the game is connected. */
Chain chain = Chain::UNKNOWN;

Expand Down Expand Up @@ -137,6 +143,13 @@ class Game : private internal::ZmqListener
/** The processor for pending moves, if any. */
PendingMoveProcessor* pending = nullptr;

/**
* Version number of the "current" pending state. This number is incremented
* whenever the pending state may have changed, and is used to identify
* known states with WaitForPendingChange.
*/
int pendingStateVersion = 1;

/**
* Desired size for batches of atomic transactions while the game is
* catching up. <= 1 means no batching even in these situations.
Expand Down Expand Up @@ -227,6 +240,17 @@ class Game : private internal::ZmqListener
*/
void NotifyStateChange () const;

/**
* Notifies potentially-waiting threads that the pending state has changed.
*/
void NotifyPendingStateChange ();

/**
* Returns the current pending state as JSON, but assuming that the caller
* already holds the mut lock.
*/
Json::Value UnlockedPendingJsonState () const;

/**
* Converts a state enum value to a string for use in log messages and the
* JSON-RPC interface.
Expand All @@ -237,6 +261,12 @@ class Game : private internal::ZmqListener

public:

/**
* Special value for the old version in WaitForPendingChange that tells the
* function to always block.
*/
static constexpr int WAITFORCHANGE_ALWAYS_BLOCK = 0;

explicit Game (const std::string& id);

Game () = delete;
Expand Down Expand Up @@ -374,6 +404,18 @@ class Game : private internal::ZmqListener
*/
void WaitForChange (const uint256& oldBlock, uint256& newBlock) const;

/**
* Blocks the calling thread until a change to the pending state has
* been made. Note that this function may return spuriously.
* Returns the new pending state as per GetPendingJsonState.
*
* If oldVersion is passed and not WAITFORCHANGE_ALWAYS_BLOCK, then the
* method returns immediately if the version of the current pending state
* (as returned in the JSON field "version") does not match the one
* passed in.
*/
Json::Value WaitForPendingChange (int oldState) const;

/**
* Starts the ZMQ subscriber and other logic. Must not be called before
* the ZMQ endpoint has been configured, and must not be called when
Expand Down
Loading