Skip to content

Commit

Permalink
Merge pull request #79 from xaya/pending-processing
Browse files Browse the repository at this point in the history
General code for tracking pending moves in a GSP
  • Loading branch information
domob1812 committed Aug 7, 2019
2 parents 708b950 + dfb8505 commit f9d304a
Show file tree
Hide file tree
Showing 20 changed files with 1,462 additions and 182 deletions.
3 changes: 3 additions & 0 deletions xayagame/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ libxayagame_la_SOURCES = \
heightcache.cpp \
lmdbstorage.cpp \
mainloop.cpp \
pendingmoves.cpp \
pruningqueue.cpp \
signatures.cpp \
sqlitegame.cpp \
Expand All @@ -50,6 +51,7 @@ xayagame_HEADERS = \
heightcache.hpp \
lmdbstorage.hpp \
mainloop.hpp \
pendingmoves.hpp \
pruningqueue.hpp \
signatures.hpp \
sqlitegame.hpp \
Expand Down Expand Up @@ -92,6 +94,7 @@ tests_SOURCES = \
heightcache_tests.cpp \
lmdbstorage_tests.cpp \
mainloop_tests.cpp \
pendingmoves_tests.cpp \
pruningqueue_tests.cpp \
signatures_tests.cpp \
sqlitegame_tests.cpp \
Expand Down
6 changes: 6 additions & 0 deletions xayagame/defaultmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ DefaultMain (const GameDaemonConfiguration& config, const std::string& gameId,

game->SetGameLogic (rules);

if (config.PendingMoves != nullptr)
game->SetPendingMoveProcessor (*config.PendingMoves);

if (config.EnablePruning >= 0)
game->EnablePruning (config.EnablePruning);

Expand Down Expand Up @@ -233,6 +236,9 @@ SQLiteMain (const GameDaemonConfiguration& config, const std::string& gameId,

game->SetGameLogic (rules);

if (config.PendingMoves != nullptr)
game->SetPendingMoveProcessor (*config.PendingMoves);

if (config.EnablePruning >= 0)
game->EnablePruning (config.EnablePruning);

Expand Down
7 changes: 7 additions & 0 deletions xayagame/defaultmain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "game.hpp"
#include "gamelogic.hpp"
#include "pendingmoves.hpp"
#include "sqlitegame.hpp"
#include "storage.hpp"

Expand Down Expand Up @@ -205,6 +206,12 @@ struct GameDaemonConfiguration
*/
std::string DataDirectory;

/**
* If set to non-null, then this PendingMoveProcessor instance is associated
* to the Game.
*/
PendingMoveProcessor* PendingMoves = nullptr;

/**
* Factory class for customed instances of certain optional classes
* like the RPC server. If not set, default classes are used instead.
Expand Down
114 changes: 106 additions & 8 deletions xayagame/game.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

#include "game.hpp"

#include <jsonrpccpp/common/errors.h>
#include <jsonrpccpp/common/exception.h>

#include <glog/logging.h>

#include <chrono>
Expand Down Expand Up @@ -239,6 +242,9 @@ Game::BlockAttach (const std::string& id, const Json::Value& data,

if (needReinit)
ReinitialiseState ();

if (state == State::UP_TO_DATE && pending != nullptr)
pending->ProcessAttachedBlock (storage->GetCurrentGameState ());
}

void
Expand Down Expand Up @@ -323,6 +329,29 @@ Game::BlockDetach (const std::string& id, const Json::Value& data,

if (needReinit)
ReinitialiseState ();

if (state == State::UP_TO_DATE && pending != nullptr)
pending->ProcessDetachedBlock (storage->GetCurrentGameState (), data);
}

void
Game::PendingMove (const std::string& id, const Json::Value& data)
{
CHECK_EQ (id, gameId);
VLOG (2) << "Pending move:\n" << data;

uint256 txid;
CHECK (txid.FromHex (data["txid"].asString ()));
VLOG (1) << "Processing pending move " << txid.ToHex ();

std::lock_guard<std::mutex> lock(mut);
if (state == State::UP_TO_DATE)
{
CHECK (pending != nullptr);
pending->ProcessMove (storage->GetCurrentGameState (), data);
}
else
VLOG (1) << "Ignoring pending move while not up-to-date: " << data;
}

void
Expand All @@ -349,6 +378,8 @@ Game::ConnectRpcClient (jsonrpc::IClientConnector& conn)
LOG (INFO) << "Connected to RPC daemon with chain " << ChainToString (chain);
if (rules != nullptr)
rules->InitialiseGameContext (chain, gameId, rpcClient.get ());
if (pending != nullptr)
pending->InitialiseGameContext (chain, gameId, rpcClient.get ());
}

unsigned
Expand Down Expand Up @@ -426,6 +457,16 @@ Game::SetGameLogic (GameLogic& gl)
rules->InitialiseGameContext (chain, gameId, rpcClient.get ());
}

void
Game::SetPendingMoveProcessor (PendingMoveProcessor& p)
{
std::lock_guard<std::mutex> lock(mut);
CHECK (!mainLoop.IsRunning ());
pending = &p;
if (chain != Chain::UNKNOWN)
pending->InitialiseGameContext (chain, gameId, rpcClient.get ());
}

void
Game::EnablePruning (const unsigned nBlocks)
{
Expand Down Expand Up @@ -454,15 +495,36 @@ Game::DetectZmqEndpoint ()
}
VLOG (1) << "Configured ZMQ notifications:\n" << notifications;

bool foundBlocks = false;
for (const auto& val : notifications)
if (val.get ("type", "") == "pubgameblocks")
{
const std::string endpoint = val.get ("address", "").asString ();
CHECK (!endpoint.empty ());
LOG (INFO) << "Detected ZMQ endpoint: " << endpoint;
zmq.SetEndpoint (endpoint);
return true;
}
{
const auto& typeVal = val["type"];
if (!typeVal.isString ())
continue;

const auto& addrVal = val["address"];
CHECK (addrVal.isString ());
const std::string address = addrVal.asString ();
CHECK (!address.empty ());

const std::string type = typeVal.asString ();
if (type == "pubgameblocks")
{
LOG (INFO) << "Detected ZMQ blocks endpoint: " << address;
zmq.SetEndpoint (address);
foundBlocks = true;
continue;
}
if (type == "pubgamepending")
{
LOG (INFO) << "Detected ZMQ pending endpoint: " << address;
zmq.SetEndpointForPending (address);
continue;
}
}

if (foundBlocks)
return true;

LOG (WARNING) << "No -zmqpubgameblocks notifier seems to be set up";
return false;
Expand Down Expand Up @@ -504,6 +566,34 @@ Game::GetCurrentJsonState () const
});
}

Json::Value
Game::GetPendingJsonState () const
{
std::unique_lock<std::mutex> lock(mut);

if (!zmq.IsPendingEnabled ())
throw jsonrpc::JsonRpcException (jsonrpc::Errors::ERROR_RPC_INTERNAL_ERROR,
"pending moves are not tracked");
CHECK (pending != nullptr);

Json::Value res(Json::objectValue);
res["gameid"] = gameId;
res["chain"] = ChainToString (chain);
res["state"] = StateToString (state);

uint256 hash;
unsigned height;
if (storage->GetCurrentBlockHashWithHeight (hash, height))
{
res["blockhash"] = hash.ToHex ();
res["height"] = height;
}

res["pending"] = pending->ToJson ();

return res;
}

void
Game::NotifyStateChange () const
{
Expand Down Expand Up @@ -563,6 +653,14 @@ Game::UntrackGame ()
void
Game::Start ()
{
if (pending == nullptr)
{
LOG (WARNING)
<< "No PendingMoveProcessor has been set, disabling pending moves"
" in the ZMQ subscriber";
zmq.SetEndpointForPending ("");
}

TrackGame ();
zmq.Start ();

Expand Down
23 changes: 22 additions & 1 deletion xayagame/game.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "gamelogic.hpp"
#include "heightcache.hpp"
#include "mainloop.hpp"
#include "pendingmoves.hpp"
#include "pruningqueue.hpp"
#include "storage.hpp"
#include "transactionmanager.hpp"
Expand Down Expand Up @@ -133,6 +134,9 @@ class Game : private internal::ZmqListener
/** The game rules in use. */
GameLogic* rules = nullptr;

/** The processor for pending moves, if any. */
PendingMoveProcessor* pending = nullptr;

/**
* Desired size for batches of atomic transactions while the game is
* catching up. <= 1 means no batching even in these situations.
Expand All @@ -159,6 +163,7 @@ class Game : private internal::ZmqListener
bool seqMismatch) override;
void BlockDetach (const std::string& id, const Json::Value& data,
bool seqMismatch) override;
void PendingMove (const std::string& id, const Json::Value& data) override;

/**
* Adds this game's ID to the tracked games of the core daemon.
Expand Down Expand Up @@ -277,14 +282,21 @@ class Game : private internal::ZmqListener
*/
void SetGameLogic (GameLogic& gl);

/**
* Sets the processor for pending moves. Setting one is optional; if no
* processor is set of pending move notifications are not enabled in the
* connected Xaya Core, then no pending state will be available.
*/
void SetPendingMoveProcessor (PendingMoveProcessor& p);

/**
* Enables (or changes) pruning with the given number of blocks to keep.
* Must be called after the storage is set already.
*/
void EnablePruning (unsigned nBlocks);

/**
* Detects the ZMQ endpoint by calling getzmqnotifications on the Xaya
* Detects the ZMQ endpoint(s) by calling getzmqnotifications on the Xaya
* daemon. Returns false if pubgameblocks is not enabled.
*/
bool DetectZmqEndpoint ();
Expand Down Expand Up @@ -327,6 +339,15 @@ class Game : private internal::ZmqListener
*/
Json::Value GetCurrentJsonState () const;

/**
* Returns a JSON object that contains data about the current state
* of pending moves as JSON.
*
* If no PendingMoveProcessor is attached or if pending moves are disabled
* in the Xaya Core notifications, then this raises a JSON-RPC error.
*/
Json::Value GetPendingJsonState () const;

/**
* Blocks the calling thread until a change to the game state has
* (potentially) been made. This can be used to implement long-polling
Expand Down
Loading

0 comments on commit f9d304a

Please sign in to comment.