Skip to content

Commit

Permalink
feat: ETLng loader basics (#1808)
Browse files Browse the repository at this point in the history
For #1597
  • Loading branch information
godexsoft authored Jan 9, 2025
1 parent 36a9f40 commit 48c8d85
Show file tree
Hide file tree
Showing 31 changed files with 1,093 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/etl/ETLService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ ETLService::monitor()
}
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
amendmentBlockHandler_.onAmendmentBlock();
amendmentBlockHandler_.notifyAmendmentBlocked();
return;
}

Expand Down
2 changes: 1 addition & 1 deletion src/etl/impl/AmendmentBlockHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ AmendmentBlockHandler::AmendmentBlockHandler(
}

void
AmendmentBlockHandler::onAmendmentBlock()
AmendmentBlockHandler::notifyAmendmentBlocked()
{
state_.get().isAmendmentBlocked = true;
repeat_.start(interval_, action_);
Expand Down
2 changes: 1 addition & 1 deletion src/etl/impl/AmendmentBlockHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AmendmentBlockHandler {
);

void
onAmendmentBlock();
notifyAmendmentBlocked();
};

} // namespace etl::impl
2 changes: 1 addition & 1 deletion src/etl/impl/Transformer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class Transformer {
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to build next ledger: " << e.what();

amendmentBlockHandler_.get().onAmendmentBlock();
amendmentBlockHandler_.get().notifyAmendmentBlocked();
return {ripple::LedgerHeader{}, false};
}

Expand Down
37 changes: 37 additions & 0 deletions src/etlng/AmendmentBlockHandlerInterface.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#pragma once

namespace etlng {

/**
* @brief The interface of a handler for amendment blocking
*/
struct AmendmentBlockHandlerInterface {
virtual ~AmendmentBlockHandlerInterface() = default;

/**
* @brief The function to call once an amendment block has been discovered
*/
virtual void
notifyAmendmentBlocked() = 0;
};

} // namespace etlng
5 changes: 4 additions & 1 deletion src/etlng/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
add_library(clio_etlng)

target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp)
target_sources(
clio_etlng PRIVATE impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp
impl/Loading.cpp
)

target_link_libraries(clio_etlng PUBLIC clio_data)
134 changes: 134 additions & 0 deletions src/etlng/LoadBalancerInterface.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

/** @file */
#pragma once

#include "etl/ETLState.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "rpc/Errors.hpp"

#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/value.hpp>
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>

#include <chrono>
#include <cstdint>
#include <expected>
#include <optional>
#include <string>
#include <vector>

namespace etlng {

/**
* @brief An interface for LoadBalancer
*/
class LoadBalancerInterface {
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;

virtual ~LoadBalancerInterface() = default;

/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param loader InitialLoadObserverInterface implementation
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
virtual std::vector<std::string>
loadInitialLedger(
uint32_t sequence,
etlng::InitialLoadObserverInterface& loader,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) = 0;

/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
virtual std::vector<std::string>
loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}) = 0;

/**
* @brief Fetch data for a specific ledger.
*
* This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger
* is found in the database, or the server is shutting down.
*
* @param ledgerSequence Sequence of the ledger to fetch
* @param getObjects Whether to get the account state diff between this ledger and the prior one
* @param getObjectNeighbors Whether to request object neighbors
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return The extracted data, if extraction was successful. If the ledger was found
* in the database or the server is shutting down, the optional will be empty
*/
virtual OptionalGetLedgerResponseType
fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) = 0;

/**
* @brief Represent the state of this load balancer as a JSON object
*
* @return JSON representation of the state of this load balancer.
*/
virtual boost::json::value
toJson() const = 0;

/**
* @brief Forward a JSON RPC request to a randomly selected rippled node.
*
* @param request JSON-RPC request to forward
* @param clientIp The IP address of the peer, if known
* @param isAdmin Whether the request is from an admin
* @param yield The coroutine context
* @return Response received from rippled node as JSON object on success or error on failure
*/
virtual std::expected<boost::json::object, rpc::ClioError>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
) = 0;

/**
* @brief Return state of ETL nodes.
* @return ETL state, nullopt if etl nodes not available
*/
virtual std::optional<etl::ETLState>
getETLState() noexcept = 0;
};

} // namespace etlng
52 changes: 52 additions & 0 deletions src/etlng/LoaderInterface.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#pragma once

#include "etlng/Models.hpp"

#include <xrpl/protocol/LedgerHeader.h>

#include <optional>

namespace etlng {

/**
* @brief An interface for a ETL Loader
*/
struct LoaderInterface {
virtual ~LoaderInterface() = default;

/**
* @brief Load ledger data
* @param data The data to load
*/
virtual void
load(model::LedgerData const& data) = 0;

/**
* @brief Load the initial ledger
* @param data The data to load
* @return Optional ledger header
*/
virtual std::optional<ripple::LedgerHeader>
loadInitialLedger(model::LedgerData const& data) = 0;
};

} // namespace etlng
47 changes: 47 additions & 0 deletions src/etlng/Models.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>

Expand Down Expand Up @@ -79,6 +80,23 @@ struct Transaction {
ripple::uint256 id;
std::string key; // key is the above id as a string of 32 characters
ripple::TxType type;

/**
* @brief Compares Transaction objects to each other without considering sttx and meta fields
* @param other The Transaction to compare to
* @return true if transaction is equivalent; false otherwise
*/
bool
operator==(Transaction const& other) const
{
return raw == other.raw //
and metaRaw == other.metaRaw //
and sttx.getTransactionID() == other.sttx.getTransactionID() //
and meta.getTxID() == other.meta.getTxID() //
and id == other.id //
and key == other.key //
and type == other.type;
}
};

/**
Expand All @@ -103,6 +121,9 @@ struct Object {
std::string predecessor;

ModType type;

bool
operator==(Object const&) const = default;
};

/**
Expand All @@ -111,6 +132,9 @@ struct Object {
struct BookSuccessor {
std::string firstBook;
std::string bookBase;

bool
operator==(BookSuccessor const&) const = default;
};

/**
Expand All @@ -125,6 +149,29 @@ struct LedgerData {
ripple::LedgerHeader header;
std::string rawHeader;
uint32_t seq;

/**
* @brief Compares LedgerData objects to each other without considering the header field
* @param other The LedgerData to compare to
* @return true if data is equivalent; false otherwise
*/
bool
operator==(LedgerData const& other) const
{
auto const serialized = [](auto const& hdr) {
ripple::Serializer ser;
ripple::addRaw(hdr, ser);
return ser.getString();
};

return transactions == other.transactions //
and objects == other.objects //
and successors == other.successors //
and edgeKeys == other.edgeKeys //
and serialized(header) == serialized(other.header) //
and rawHeader == other.rawHeader //
and seq == other.seq;
}
};

} // namespace etlng::model
Loading

0 comments on commit 48c8d85

Please sign in to comment.