Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send validator telemetry to the private overlay #1325

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions create-hardfork/create-hardfork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ class HardforkCreator : public td::actor::Actor {

void new_key_block(ton::validator::BlockHandle handle) override {
}
void send_validator_telemetry(ton::PublicKeyHash key,
ton::tl_object_ptr<ton::ton_api::validator_telemetry> telemetry) override {
}
};

td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::install_callback,
Expand Down
15 changes: 12 additions & 3 deletions overlay/overlay-manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ void OverlayManager::register_overlay(adnl::AdnlNodeIdShort local_id, OverlayIdS
}
overlays_[local_id][overlay_id] = OverlayDescription{std::move(overlay), std::move(cert)};

if (!with_db_) {
return;
}
auto P =
td::PromiseCreator::lambda([id = overlays_[local_id][overlay_id].overlay.get()](td::Result<DbType::GetResult> R) {
R.ensure();
Expand Down Expand Up @@ -417,13 +420,19 @@ OverlayManager::OverlayManager(std::string db_root, td::actor::ActorId<keyring::
}

void OverlayManager::start_up() {
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
if (!db_root_.empty()) {
with_db_ = true;
std::shared_ptr<td::KeyValue> kv =
std::make_shared<td::RocksDb>(td::RocksDb::open(PSTRING() << db_root_ << "/overlays").move_as_ok());
db_ = DbType{std::move(kv)};
}
}

void OverlayManager::save_to_db(adnl::AdnlNodeIdShort local_id, OverlayIdShort overlay_id,
std::vector<OverlayNode> nodes) {
if (!with_db_) {
return;
}
std::vector<tl_object_ptr<ton_api::overlay_node>> nodes_vec;
for (auto &n : nodes) {
nodes_vec.push_back(n.tl());
Expand Down
1 change: 1 addition & 0 deletions overlay/overlay-manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class OverlayManager : public Overlays {
td::actor::ActorId<dht::Dht> dht_node_;

using DbType = td::KeyValueAsync<td::Bits256, td::BufferSlice>;
bool with_db_ = false;
DbType db_;

class AdnlCallback : public adnl::Adnl::Callback {
Expand Down
4 changes: 2 additions & 2 deletions overlay/overlay-peers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ void OverlayImpl::add_peer(OverlayNode node) {
peer_list_.peers_.insert(id, OverlayPeer(std::move(node)));
del_some_peers();
auto X = peer_list_.peers_.get(id);
if (X != nullptr && peer_list_.neighbours_.size() < max_neighbours() &&
if (X != nullptr && !X->is_neighbour() && peer_list_.neighbours_.size() < max_neighbours() &&
!(X->get_node()->flags() & OverlayMemberFlags::DoNotReceiveBroadcasts) && X->get_id() != local_id_) {
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
Expand Down Expand Up @@ -440,7 +440,7 @@ void OverlayImpl::update_neighbours(td::uint32 nodes_to_change) {
VLOG(OVERLAY_INFO) << this << ": adding new neighbour " << X->get_id();
peer_list_.neighbours_.push_back(X->get_id());
X->set_neighbour(true);
} else {
} else if (X->is_alive()) {
CHECK(nodes_to_change > 0);
auto i = td::Random::fast(0, static_cast<td::uint32>(peer_list_.neighbours_.size()) - 1);
auto Y = peer_list_.peers_.get(peer_list_.neighbours_[i]);
Expand Down
7 changes: 6 additions & 1 deletion overlay/overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,12 @@ void OverlayImpl::alarm() {
update_db_at_ = td::Timestamp::in(60.0);
}

update_neighbours(0);
if (update_neighbours_at_.is_in_past()) {
update_neighbours(2);
update_neighbours_at_ = td::Timestamp::in(td::Random::fast(30.0, 120.0));
} else {
update_neighbours(0);
}
alarm_timestamp() = td::Timestamp::in(1.0);
} else {
update_neighbours(0);
Expand Down
1 change: 1 addition & 0 deletions overlay/overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ class OverlayImpl : public Overlay {
td::Timestamp next_dht_store_query_ = td::Timestamp::in(1.0);
td::Timestamp update_db_at_;
td::Timestamp update_throughput_at_;
td::Timestamp update_neighbours_at_;
td::Timestamp last_throughput_update_;

std::unique_ptr<Overlays::Callback> callback_;
Expand Down
41 changes: 41 additions & 0 deletions tdutils/td/utils/port/Stat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,4 +472,45 @@ Result<TotalMemStat> get_total_mem_stat() {
#endif
}

Result<uint32> get_cpu_cores() {
#if TD_LINUX
uint32 result = 0;
TRY_RESULT(fd, FileFd::open("/proc/cpuinfo", FileFd::Read));
SCOPE_EXIT {
fd.close();
};
std::string data;
char buf[10000];
while (true) {
TRY_RESULT(size, fd.read(MutableSlice{buf, sizeof(buf) - 1}));
if (size == 0) {
break;
}
buf[size] = '\0';
data += buf;
}
size_t i = 0;
while (i < data.size()) {
const char *line_begin = data.data() + i;
while (i < data.size() && data[i] != '\n') {
++i;
}
auto line_end = data.data() + i;
++i;
Slice line{line_begin, line_end};
size_t j = 0;
while (j < line.size() && line[j] != ' ' && line[j] != '\t' && line[j] != ':') {
++j;
}
Slice name = line.substr(0, j);
if (name == "processor") {
++result;
}
}
return result;
#else
return Status::Error("Not supported");
#endif
}

} // namespace td
2 changes: 2 additions & 0 deletions tdutils/td/utils/port/Stat.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ struct TotalMemStat {
};
Result<TotalMemStat> get_total_mem_stat() TD_WARN_UNUSED_RESULT;

Result<uint32> get_cpu_cores() TD_WARN_UNUSED_RESULT;

} // namespace td
3 changes: 3 additions & 0 deletions test/test-ton-collator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ class TestNode : public td::actor::Actor {

void new_key_block(ton::validator::BlockHandle handle) override {
}
void send_validator_telemetry(ton::PublicKeyHash key,
ton::tl_object_ptr<ton::ton_api::validator_telemetry> telemetry) override {
}
};

td::actor::send_closure(validator_manager_, &ton::validator::ValidatorManagerInterface::install_callback,
Expand Down
4 changes: 4 additions & 0 deletions tl/generate/scheme/ton_api.tl
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,10 @@ validator.group workchain:int shard:long catchain_seqno:int config_hash:int256 m
validator.groupEx workchain:int shard:long vertical_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;
validator.groupNew workchain:int shard:long vertical_seqno:int last_key_block_seqno:int catchain_seqno:int config_hash:int256 members:(vector validator.groupMember) = validator.Group;

validator.telemetry flags:# timestamp:double adnl_id:int256
node_version:string os_version:string node_started_at:int
ram_size:long cpu_cores:int node_threads:int = validator.Telemetry;

---functions---


Expand Down
Binary file modified tl/generate/scheme/ton_api.tlo
Binary file not shown.
13 changes: 13 additions & 0 deletions validator-engine/validator-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1945,6 +1945,10 @@ void ValidatorEngine::start_full_node() {
default_dht_node_.is_zero() ? td::actor::ActorId<ton::dht::Dht>{} : dht_nodes_[default_dht_node_].get(),
overlay_manager_.get(), validator_manager_.get(), full_node_client_.get(), db_root_);
load_custom_overlays_config();
if (!validator_telemetry_filename_.empty()) {
td::actor::send_closure(full_node_, &ton::validator::fullnode::FullNode::set_validator_telemetry_filename,
validator_telemetry_filename_);
}
}

for (auto &v : config_.validators) {
Expand Down Expand Up @@ -4331,6 +4335,15 @@ int main(int argc, char *argv[]) {
acts.push_back(
[&x]() { td::actor::send_closure(x, &ValidatorEngine::set_fast_state_serializer_enabled, true); });
});
p.add_option(
'\0', "collect-validator-telemetry",
"store validator telemetry from private block overlay to a given file (json format)",
[&](td::Slice s) {
acts.push_back(
[&x, s = s.str()]() {
td::actor::send_closure(x, &ValidatorEngine::set_validator_telemetry_filename, s);
});
});
auto S = p.run(argc, argv);
if (S.is_error()) {
LOG(ERROR) << "failed to parse options: " << S.move_as_error();
Expand Down
4 changes: 4 additions & 0 deletions validator-engine/validator-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ class ValidatorEngine : public td::actor::Actor {
ton::BlockSeqno truncate_seqno_{0};
std::string session_logs_file_;
bool fast_state_serializer_enabled_ = false;
std::string validator_telemetry_filename_;

std::set<ton::CatchainSeqno> unsafe_catchains_;
std::map<ton::BlockSeqno, std::pair<ton::CatchainSeqno, td::uint32>> unsafe_catchain_rotations_;
Expand Down Expand Up @@ -310,6 +311,9 @@ class ValidatorEngine : public td::actor::Actor {
void set_fast_state_serializer_enabled(bool value) {
fast_state_serializer_enabled_ = value;
}
void set_validator_telemetry_filename(std::string value) {
validator_telemetry_filename_ = std::move(value);
}
void start_up() override;
ValidatorEngine() {
}
Expand Down
2 changes: 2 additions & 0 deletions validator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ set(VALIDATOR_HEADERS

import-db-slice.hpp
queue-size-counter.hpp
validator-telemetry.hpp

manager-disk.h
manager-disk.hpp
Expand All @@ -82,6 +83,7 @@ set(VALIDATOR_SOURCE
validator-group.cpp
validator-options.cpp
queue-size-counter.cpp
validator-telemetry.cpp

downloaders/wait-block-data.cpp
downloaders/wait-block-state.cpp
Expand Down
66 changes: 65 additions & 1 deletion validator/full-node-private-overlay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#include "common/delay.h"
#include "common/checksum.h"
#include "full-node-serializer.hpp"
#include "auto/tl/ton_api_json.h"
#include "td/utils/JsonBuilder.h"
#include "tl/tl_json.h"

namespace ton::validator::fullnode {

Expand Down Expand Up @@ -85,15 +88,52 @@ void FullNodePrivateBlockOverlay::process_block_candidate_broadcast(PublicKeyHas
validator_set_hash, std::move(data));
}

void FullNodePrivateBlockOverlay::process_telemetry_broadcast(
PublicKeyHash src, const tl_object_ptr<ton_api::validator_telemetry>& telemetry) {
if (telemetry->adnl_id_ != src.bits256_value()) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": adnl_id mismatch";
return;
}
auto now = (td::int32)td::Clocks::system();
if (telemetry->timestamp_ < now - 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too old ("
<< now - telemetry->timestamp_ << "s ago)";
return;
}
if (telemetry->timestamp_ > now + 60) {
VLOG(FULL_NODE_WARNING) << "Invalid telemetry broadcast from " << src << ": too new ("
<< telemetry->timestamp_ - now << "s in the future)";
return;
}
VLOG(FULL_NODE_DEBUG) << "Got telemetry broadcast from " << src;
auto s = td::json_encode<std::string>(td::ToJson(*telemetry), false);
std::erase_if(s, [](char c) {
return c == '\n' || c == '\r';
});
telemetry_file_ << s << "\n";
telemetry_file_.flush();
if (telemetry_file_.fail()) {
VLOG(FULL_NODE_WARNING) << "Failed to write telemetry to file";
}
}

void FullNodePrivateBlockOverlay::receive_broadcast(PublicKeyHash src, td::BufferSlice broadcast) {
if (adnl::AdnlNodeIdShort{src} == local_id_) {
return;
}
auto B = fetch_tl_object<ton_api::tonNode_Broadcast>(std::move(broadcast), true);
if (B.is_error()) {
if (collect_telemetry_ && src != local_id_.pubkey_hash()) {
auto R = fetch_tl_prefix<ton_api::validator_telemetry>(broadcast, true);
if (R.is_ok()) {
process_telemetry_broadcast(src, R.ok());
}
}
return;
}
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto &obj) { Self->process_broadcast(src, obj); });
ton_api::downcast_call(*B.move_as_ok(), [src, Self = this](auto& obj) {
Self->process_broadcast(src, obj);
});
}

void FullNodePrivateBlockOverlay::send_shard_block_info(BlockIdExt block_id, CatchainSeqno cc_seqno,
Expand Down Expand Up @@ -144,6 +184,30 @@ void FullNodePrivateBlockOverlay::send_broadcast(BlockBroadcast broadcast) {
local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), B.move_as_ok());
}

void FullNodePrivateBlockOverlay::send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry) {
process_telemetry_broadcast(local_id_.pubkey_hash(), telemetry);
auto data = serialize_tl_object(telemetry, true);
if (data.size() <= overlay::Overlays::max_simple_broadcast_size()) {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
} else {
td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_,
local_id_.pubkey_hash(), 0, std::move(data));
}
}

void FullNodePrivateBlockOverlay::collect_validator_telemetry(std::string filename) {
if (collect_telemetry_) {
telemetry_file_.close();
}
collect_telemetry_ = true;
LOG(FULL_NODE_WARNING) << "Collecting validator telemetry to " << filename << " (local id: " << local_id_ << ")";
telemetry_file_.open(filename, std::ios_base::app);
if (!telemetry_file_.is_open()) {
LOG(WARNING) << "Cannot open file " << filename << " for validator telemetry";
}
}

void FullNodePrivateBlockOverlay::start_up() {
std::sort(nodes_.begin(), nodes_.end());
nodes_.erase(std::unique(nodes_.begin(), nodes_.end()), nodes_.end());
Expand Down
9 changes: 9 additions & 0 deletions validator/full-node-private-overlay.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include "full-node.h"
#include <fstream>

namespace ton::validator::fullnode {

Expand All @@ -32,6 +33,8 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void process_broadcast(PublicKeyHash src, ton_api::tonNode_newBlockCandidateBroadcastCompressed &query);
void process_block_candidate_broadcast(PublicKeyHash src, ton_api::tonNode_Broadcast &query);

void process_telemetry_broadcast(PublicKeyHash src, const tl_object_ptr<ton_api::validator_telemetry>& telemetry);

template <class T>
void process_broadcast(PublicKeyHash, T &) {
VLOG(FULL_NODE_WARNING) << "dropping unknown broadcast";
Expand All @@ -42,6 +45,9 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {
void send_block_candidate(BlockIdExt block_id, CatchainSeqno cc_seqno, td::uint32 validator_set_hash,
td::BufferSlice data);
void send_broadcast(BlockBroadcast broadcast);
void send_validator_telemetry(tl_object_ptr<ton_api::validator_telemetry> telemetry);

void collect_validator_telemetry(std::string filename);

void set_config(FullNodeConfig config) {
config_ = std::move(config);
Expand Down Expand Up @@ -91,6 +97,9 @@ class FullNodePrivateBlockOverlay : public td::actor::Actor {

void try_init();
void init();

bool collect_telemetry_ = false;
std::ofstream telemetry_file_;
};

class FullNodeCustomOverlay : public td::actor::Actor {
Expand Down
Loading
Loading