From 5d79855c94711ad161b41b75c18edff5396be842 Mon Sep 17 00:00:00 2001 From: SpyCheese Date: Wed, 27 Nov 2024 18:12:23 +0300 Subject: [PATCH] Out msg queues: improve logs, various small changes --- ton/ton-types.h | 15 ++--- validator-session/validator-session-types.h | 3 +- validator-session/validator-session.cpp | 18 ++--- validator/full-node-fast-sync-overlays.cpp | 13 ++-- validator/full-node-shard.cpp | 9 ++- validator/impl/collator.cpp | 18 +++-- validator/impl/out-msg-queue-proof.cpp | 75 +++++++++++---------- validator/manager.cpp | 16 +++-- validator/validator-group.cpp | 18 ++--- 9 files changed, 106 insertions(+), 79 deletions(-) diff --git a/ton/ton-types.h b/ton/ton-types.h index aeb0595ad..f8eb49df1 100644 --- a/ton/ton-types.h +++ b/ton/ton-types.h @@ -428,15 +428,14 @@ struct Ed25519_PublicKey { struct OutMsgQueueProofBroadcast : public td::CntObject { OutMsgQueueProofBroadcast(ShardIdFull dst_shard, BlockIdExt block_id, td::int32 max_bytes, td::int32 max_msgs, - td::BufferSlice queue_proofs, td::BufferSlice block_state_proofs, - std::vector msg_counts) + td::BufferSlice queue_proof, td::BufferSlice block_state_proof, int msg_count) : dst_shard(std::move(dst_shard)) , block_id(block_id) , max_bytes(max_bytes) , max_msgs(max_msgs) - , queue_proofs(std::move(queue_proofs)) - , block_state_proofs(std::move(block_state_proofs)) - , msg_counts(std::move(msg_counts)) { + , queue_proofs(std::move(queue_proof)) + , block_state_proofs(std::move(block_state_proof)) + , msg_count(std::move(msg_count)) { } ShardIdFull dst_shard; BlockIdExt block_id; @@ -448,11 +447,11 @@ struct OutMsgQueueProofBroadcast : public td::CntObject { // outMsgQueueProof td::BufferSlice queue_proofs; td::BufferSlice block_state_proofs; - std::vector msg_counts; + int msg_count; - virtual OutMsgQueueProofBroadcast* make_copy() const { + OutMsgQueueProofBroadcast* make_copy() const override { return new OutMsgQueueProofBroadcast(dst_shard, block_id, max_bytes, max_msgs, queue_proofs.clone(), - block_state_proofs.clone(), msg_counts); + block_state_proofs.clone(), msg_count); } }; diff --git a/validator-session/validator-session-types.h b/validator-session/validator-session-types.h index 7147bf2d3..2edc42bbd 100644 --- a/validator-session/validator-session-types.h +++ b/validator-session/validator-session-types.h @@ -179,9 +179,8 @@ struct EndValidatorGroupStats { }; struct BlockSourceInfo { - td::uint32 round, first_block_round; PublicKey source; - td::int32 source_priority; + BlockCandidatePriority priority; }; } // namespace validatorsession diff --git a/validator-session/validator-session.cpp b/validator-session/validator-session.cpp index 3a913990c..99ee61f23 100644 --- a/validator-session/validator-session.cpp +++ b/validator-session/validator-session.cpp @@ -553,9 +553,9 @@ void ValidatorSessionImpl::check_generate_slot() { LOG(WARNING) << print_id << ": failed to generate block candidate: " << R.move_as_error(); } }); - callback_->on_generate_slot( - BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(local_idx()), priority}, - std::move(P)); + callback_->on_generate_slot(BlockSourceInfo{description().get_source_public_key(local_idx()), + BlockCandidatePriority{cur_round_, first_block_round_, priority}}, + std::move(P)); } else { alarm_timestamp().relax(t); } @@ -634,8 +634,9 @@ void ValidatorSessionImpl::try_approve_block(const SentBlock *block) { pending_approve_.insert(block_id); callback_->on_candidate( - BlockSourceInfo{cur_round_, first_block_round_, description().get_source_public_key(block->get_src_idx()), - description().get_node_priority(block->get_src_idx(), cur_round_)}, + BlockSourceInfo{description().get_source_public_key(block->get_src_idx()), + BlockCandidatePriority{cur_round_, first_block_round_, + description().get_node_priority(block->get_src_idx(), cur_round_)}}, B->root_hash_, B->data_.clone(), B->collated_data_.clone(), std::move(P)); } else if (T.is_in_past()) { if (!active_requests_.count(block_id)) { @@ -909,9 +910,10 @@ void ValidatorSessionImpl::on_new_round(td::uint32 round) { stats.rounds.pop_back(); } - BlockSourceInfo source_info{cur_round_, first_block_round_, - description().get_source_public_key(block->get_src_idx()), - description().get_node_priority(block->get_src_idx(), cur_round_)}; + BlockSourceInfo source_info{ + description().get_source_public_key(block->get_src_idx()), + BlockCandidatePriority{cur_round_, first_block_round_, + description().get_node_priority(block->get_src_idx(), cur_round_)}}; if (it == blocks_.end()) { callback_->on_block_committed(std::move(source_info), block->get_root_hash(), block->get_file_hash(), td::BufferSlice(), std::move(export_sigs), std::move(export_approve_sigs), diff --git a/validator/full-node-fast-sync-overlays.cpp b/validator/full-node-fast-sync-overlays.cpp index 4e0d11e48..947f9dee8 100644 --- a/validator/full-node-fast-sync-overlays.cpp +++ b/validator/full-node-fast-sync-overlays.cpp @@ -47,6 +47,9 @@ void FullNodeFastSyncOverlay::process_block_broadcast(PublicKeyHash src, ton_api } void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonNode_outMsgQueueProofBroadcast &query) { + if (src == local_id_.pubkey_hash()) { + return; // dropping broadcast from self + } BlockIdExt block_id = create_block_id(query.block_); ShardIdFull shard_id = create_shard_id(query.dst_shard_); if (query.proof_->get_id() != ton_api::tonNode_outMsgQueueProof::ID) { @@ -68,7 +71,8 @@ void FullNodeFastSyncOverlay::process_broadcast(PublicKeyHash src, ton_api::tonN } auto proof = std::move(R.move_as_ok()[0]); - LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast " << shard_id.to_str() << " " << block_id.to_str(); + LOG(INFO) << "got tonNode.outMsgQueueProofBroadcast to " << shard_id.to_str() << " from " << block_id.to_str() + << ", msgs=" << proof->msg_count_ << ", size=" << tl_proof->queue_proofs_.size(); td::actor::send_closure(validator_manager_, &ValidatorManagerInterface::add_out_msg_queue_proof, shard_id, std::move(proof)); } @@ -236,9 +240,10 @@ void FullNodeFastSyncOverlay::send_out_msg_queue_proof_broadcast(td::Ref(broadcast->max_bytes, broadcast->max_msgs), create_tl_object(broadcast->queue_proofs.clone(), broadcast->block_state_proofs.clone(), - std::vector(broadcast->msg_counts))); - VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay: " << broadcast->dst_shard.to_str() << " " - << broadcast->block_id.to_str(); + std::vector(1, broadcast->msg_count))); + VLOG(FULL_NODE_DEBUG) << "Sending outMsgQueueProof in fast sync overlay to " << broadcast->dst_shard.to_str() + << " from " << broadcast->block_id.to_str() << ", msgs=" << broadcast->msg_count + << " bytes=" << broadcast->queue_proofs.size(); td::actor::send_closure(overlays_, &overlay::Overlays::send_broadcast_fec_ex, local_id_, overlay_id_, local_id_.pubkey_hash(), overlay::Overlays::BroadcastFlagAnySender(), std::move(B)); } diff --git a/validator/full-node-shard.cpp b/validator/full-node-shard.cpp index 29950363d..c351653f4 100644 --- a/validator/full-node-shard.cpp +++ b/validator/full-node-shard.cpp @@ -703,8 +703,13 @@ void FullNodeShardImpl::process_query(adnl::AdnlNodeIdShort src, ton_api::tonNod promise.set_result(serialize_tl_object(R.move_as_ok(), true)); } }); - VLOG(FULL_NODE_DEBUG) << "Got query getOutMsgQueueProof (" << blocks.size() << " blocks) to shard " - << dst_shard.to_str() << " from " << src; + FLOG(DEBUG) { + sb << "Got query getOutMsgQueueProof to shard " << dst_shard.to_str() << " from blocks"; + for (const BlockIdExt &id : blocks) { + sb << " " << id.id.to_str(); + } + sb << " from " << src; + }; td::actor::create_actor("buildqueueproof", dst_shard, std::move(blocks), limits, validator_manager_, std::move(P)) .release(); diff --git a/validator/impl/collator.cpp b/validator/impl/collator.cpp index cd97e3fcc..dd8d55169 100644 --- a/validator/impl/collator.cpp +++ b/validator/impl/collator.cpp @@ -5854,33 +5854,39 @@ bool Collator::create_block_candidate() { block_candidate = std::make_unique(created_by_, new_block_id_ext, block::compute_file_hash(cdata_slice.as_slice()), blk_slice.clone(), cdata_slice.clone()); - const bool need_out_msg_queue_broadcasts = true; + const bool need_out_msg_queue_broadcasts = !is_masterchain(); if (need_out_msg_queue_broadcasts) { // we can't generate two proofs at the same time for the same root (it is not currently supported by cells) // so we have can't reuse new state and have to regenerate it with merkle update auto new_state = vm::MerkleUpdate::apply(prev_state_root_pure_, state_update); CHECK(new_state.not_null()); CHECK(new_state->get_hash() == state_root->get_hash()); - assert(config_ && shard_conf_); + CHECK(shard_conf_); auto neighbor_list = shard_conf_->get_neighbor_shard_hash_ids(shard_); LOG(INFO) << "Build OutMsgQueueProofs for " << neighbor_list.size() << " neighbours"; - for (ton::BlockId blk_id : neighbor_list) { + for (BlockId blk_id : neighbor_list) { auto prefix = blk_id.shard_full(); + if (shard_intersects(prefix, shard_)) { + continue; + } auto limits = mc_state_->get_imported_msg_queue_limits(blk_id.workchain); // one could use monitor_min_split_depth here, to decrease number of broadcasts // but current implementation OutMsgQueueImporter doesn't support it auto r_proof = OutMsgQueueProof::build( - prefix, {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, limits); + prefix, + {OutMsgQueueProof::OneBlock{.id = new_block_id_ext, .state_root = new_state, .block_root = new_block}}, + limits); if (r_proof.is_ok()) { auto proof = r_proof.move_as_ok(); + CHECK(proof->msg_counts_.size() == 1); block_candidate->out_msg_queue_proof_broadcasts.push_back(td::Ref( true, OutMsgQueueProofBroadcast(prefix, new_block_id_ext, limits.max_bytes, limits.max_msgs, std::move(proof->queue_proofs_), std::move(proof->block_state_proofs_), - std::move(proof->msg_counts_)))); + proof->msg_counts_[0]))); } else { - LOG(ERROR) << "Failed to build OutMsgQueueProof: " << r_proof.error(); + LOG(ERROR) << "Failed to build OutMsgQueueProof to " << prefix.to_str() << ": " << r_proof.error(); } } } diff --git a/validator/impl/out-msg-queue-proof.cpp b/validator/impl/out-msg-queue-proof.cpp index 4e8802c7e..edd20a8d2 100644 --- a/validator/impl/out-msg-queue-proof.cpp +++ b/validator/impl/out-msg-queue-proof.cpp @@ -92,29 +92,6 @@ static td::Result> process_queue( ++msg_count[kv->source]; ++msg_count_total; - // TODO: Get processed_upto from destination shard (in request?) - /* - // Parse message to check if it was processed (as in Collator::process_inbound_message) - ton::LogicalTime enqueued_lt = kv->msg->prefetch_ulong(64); - auto msg_env = kv->msg->prefetch_ref(); - block::tlb::MsgEnvelope::Record_std env; - if (!tlb::unpack_cell(msg_env, env)) { - return td::Status::Error("cannot unpack MsgEnvelope of an internal message"); - } - vm::CellSlice cs{vm::NoVmOrd{}, env.msg}; - block::gen::CommonMsgInfo::Record_int_msg_info info; - if (!tlb::unpack(cs, info)) { - return td::Status::Error("cannot unpack CommonMsgInfo of an internal message"); - } - auto src_prefix = block::tlb::MsgAddressInt::get_prefix(info.src); - auto dest_prefix = block::tlb::MsgAddressInt::get_prefix(info.dest); - auto cur_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.cur_addr); - auto next_prefix = block::interpolate_addr(src_prefix, dest_prefix, env.next_addr); - block::EnqueuedMsgDescr descr{cur_prefix, next_prefix, kv->lt, enqueued_lt, env.msg->get_hash().bits()}; - if (dst_processed_upto->already_processed(descr)) { - } else { - }*/ - dfs_cs(*kv->msg); TRY_STATUS_PREFIX(check_no_prunned(*kv->msg), "invalid message proof: ") if (estimated_proof_size >= limits.max_bytes || msg_count_total >= (long long)limits.max_msgs) { @@ -301,7 +278,12 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( return; } - LOG(DEBUG) << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks"; + FLOG(DEBUG) { + sb << "Importing neighbor msg queues for shard " << dst_shard.to_str() << ", " << blocks.size() << " blocks:"; + for (const BlockIdExt& block : blocks) { + sb << " " << block.id.to_str(); + } + }; cache_[{dst_shard, blocks}] = entry = std::make_shared(); entry->dst_shard = dst_shard; @@ -321,7 +303,7 @@ void OutMsgQueueImporter::get_neighbor_msg_queue_proofs( prefix = shard_prefix(prefix, min_split); } - LOG(INFO) << "search for out msg queue proof " << prefix.to_str() << block.to_str(); + LOG(DEBUG) << "search for out msg queue proof " << prefix.to_str() << " " << block.to_str(); auto& small_entry = small_cache_[std::make_pair(dst_shard, block)]; if (!small_entry.result.is_null()) { entry->result[block] = small_entry.result; @@ -397,7 +379,13 @@ void OutMsgQueueImporter::get_proof_import(std::shared_ptr entry, st [=, SelfId = actor_id(this), retry_after = td::Timestamp::in(0.1), dst_shard = entry->dst_shard](td::Result>> R) { if (R.is_error()) { - LOG(DEBUG) << "Failed to get out msg queue for " << dst_shard.to_str() << ": " << R.move_as_error(); + FLOG(DEBUG) { + sb << "Failed to get out msg queue for " << dst_shard.to_str() << " from"; + for (const BlockIdExt &block : blocks) { + sb << " " << block.id.to_str(); + } + sb << ": " << R.move_as_error(); + }; delay_action( [=]() { td::actor::send_closure(SelfId, &OutMsgQueueImporter::get_proof_import, entry, std::move(blocks), @@ -443,8 +431,11 @@ void OutMsgQueueImporter::got_proof(std::shared_ptr entry, std::vect void OutMsgQueueImporter::finish_query(std::shared_ptr entry) { FLOG(INFO) { - sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " << entry->blocks.size() - << " blocks in " << entry->timer.elapsed() << "s"; + sb << "Done importing neighbor msg queues for shard " << entry->dst_shard.to_str() << " from"; + for (const BlockIdExt &block : entry->blocks) { + sb << " " << block.id.to_str(); + } + sb << " in " << entry->timer.elapsed() << "s"; sb << " sources{"; if (entry->from_broadcast) { sb << " broadcast=" << entry->from_broadcast; @@ -479,8 +470,13 @@ void OutMsgQueueImporter::finish_query(std::shared_ptr entry) { bool OutMsgQueueImporter::check_timeout(std::shared_ptr entry) { if (entry->timeout.is_in_past()) { - LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << ", " - << entry->blocks.size() << " blocks: timeout"; + FLOG(DEBUG) { + sb << "Aborting importing neighbor msg queues for shard " << entry->dst_shard.to_str() << " from"; + for (const BlockIdExt &block : entry->blocks) { + sb << " " << block.id.to_str(); + } + sb << ": timeout"; + }; for (auto& p : entry->promises) { p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout")); } @@ -499,8 +495,13 @@ void OutMsgQueueImporter::alarm() { auto& promises = it->second->promises; if (it->second->timeout.is_in_past()) { if (!it->second->done) { - LOG(DEBUG) << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << ", " - << it->second->blocks.size() << " blocks: timeout"; + FLOG(DEBUG) { + sb << "Aborting importing neighbor msg queues for shard " << it->second->dst_shard.to_str() << " from"; + for (const BlockIdExt &block : it->second->blocks) { + sb << " " << block.id.to_str(); + } + sb << ": timeout"; + }; for (auto& p : promises) { p.first.set_error(td::Status::Error(ErrorCode::timeout, "timeout")); } @@ -540,7 +541,7 @@ void OutMsgQueueImporter::alarm() { } void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { - LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << proof->block_id_.to_str(); + LOG(INFO) << "add out msg queue proof " << dst_shard.to_str() << " " << proof->block_id_.to_str(); auto& small_entry = small_cache_[std::make_pair(dst_shard, proof->block_id_)]; if (!small_entry.result.is_null()) { return; @@ -556,7 +557,13 @@ void OutMsgQueueImporter::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref void BuildOutMsgQueueProof::abort_query(td::Status reason) { if (promise_) { - LOG(DEBUG) << "failed to build msg queue proof to " << dst_shard_.to_str() << ": " << reason; + FLOG(DEBUG) { + sb << "failed to build msg queue proof to " << dst_shard_.to_str() << " from"; + for (const auto& block : blocks_) { + sb << " " << block.id.id.to_str(); + } + sb << ": " << reason; + }; promise_.set_error( reason.move_as_error_prefix(PSTRING() << "failed to build msg queue proof to " << dst_shard_.to_str() << ": ")); } diff --git a/validator/manager.cpp b/validator/manager.cpp index 3ff33d234..903e31554 100644 --- a/validator/manager.cpp +++ b/validator/manager.cpp @@ -798,7 +798,9 @@ void ValidatorManagerImpl::wait_neighbor_msg_queue_proofs( if (dst_shard.is_masterchain()) { // We spit queries for masterchain {dst_shard, {block_1, ..., block_n}} into separate queries // {dst_shard, {block_1}}, ..., {dst_shard, {block_n}} - // Also, use cache + // Also, use cache. + // This is performed here and not in OutMsgQueueImporter because it's important to use + // cached_msg_queue_to_masterchain_, which is related to the current list of shard block descriptions class Worker : public td::actor::Actor { public: Worker(size_t pending, td::Promise>> promise) @@ -2958,12 +2960,15 @@ PublicKeyHash ValidatorManagerImpl::get_validator(ShardIdFull shard, td::Refget_collators_list()->self_collate; } bool ValidatorManagerImpl::Collator::can_collate_shard(ShardIdFull shard) const { @@ -3524,7 +3529,7 @@ void ValidatorManagerImpl::del_collator(adnl::AdnlNodeIdShort id, ShardIdFull sh } else { td::actor::send_closure(it->second.actor, &CollatorNode::del_shard, shard); } -}; +} void ValidatorManagerImpl::get_collation_manager_stats( td::Promise> promise) { @@ -3575,15 +3580,18 @@ void ValidatorManagerImpl::get_collation_manager_stats( } void ValidatorManagerImpl::add_out_msg_queue_proof(ShardIdFull dst_shard, td::Ref proof) { - if (!collator_nodes_.empty()) { + if (is_shard_collator(dst_shard)) { if (out_msg_queue_importer_.empty()) { out_msg_queue_importer_ = td::actor::create_actor("outmsgqueueimporter", actor_id(this), opts_, last_masterchain_state_); } td::actor::send_closure(out_msg_queue_importer_, &OutMsgQueueImporter::add_out_msg_queue_proof, dst_shard, std::move(proof)); + } else { + VLOG(VALIDATOR_DEBUG) << "Dropping unneeded out msg queue proof to shard " << dst_shard.to_str(); } } + void ValidatorManagerImpl::add_persistent_state_description(td::Ref desc) { auto now = (UnixTime)td::Clocks::system(); if (desc->end_time <= now) { diff --git a/validator/validator-group.cpp b/validator/validator-group.cpp index 344f5c1c3..97f34ea7c 100644 --- a/validator/validator-group.cpp +++ b/validator/validator-group.cpp @@ -32,13 +32,14 @@ namespace ton { namespace validator { static bool need_send_candidate_broadcast(const validatorsession::BlockSourceInfo &source_info, bool is_masterchain) { - return source_info.first_block_round == source_info.round && source_info.source_priority == 0 && !is_masterchain; + return source_info.priority.first_block_round == source_info.priority.round && source_info.priority.priority == 0 && + !is_masterchain; } void ValidatorGroup::generate_block_candidate( validatorsession::BlockSourceInfo source_info, td::Promise promise) { - td::uint32 round_id = source_info.round; + td::uint32 round_id = source_info.priority.round; if (round_id > last_known_round_id_) { last_known_round_id_ = round_id; } @@ -66,15 +67,10 @@ void ValidatorGroup::generate_block_candidate( std::move(R)); }; td::uint64 max_answer_size = config_.max_block_size + config_.max_collated_data_size + 1024; - auto block_candidate_priority = BlockCandidatePriority{ - .round = source_info.round, - .first_block_round = source_info.first_block_round, - .priority = source_info.source_priority - }; td::actor::send_closure(collation_manager_, &CollationManager::collate_block, shard_, min_masterchain_block_id_, prev_block_ids_, Ed25519_PublicKey{local_id_full_.ed25519_value().raw()}, - block_candidate_priority, validator_set_, - max_answer_size, cancellation_token_source_.get_cancellation_token(), std::move(P)); + source_info.priority, validator_set_, max_answer_size, + cancellation_token_source_.get_cancellation_token(), std::move(P)); } void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo source_info, @@ -103,7 +99,7 @@ void ValidatorGroup::generated_block_candidate(validatorsession::BlockSourceInfo void ValidatorGroup::validate_block_candidate(validatorsession::BlockSourceInfo source_info, BlockCandidate block, td::Promise> promise) { - td::uint32 round_id = source_info.round; + td::uint32 round_id = source_info.priority.round; if (round_id > last_known_round_id_) { last_known_round_id_ = round_id; } @@ -174,7 +170,7 @@ void ValidatorGroup::accept_block_candidate(validatorsession::BlockSourceInfo so validatorsession::ValidatorSessionStats stats, td::Promise promise) { stats.cc_seqno = validator_set_->get_catchain_seqno(); - td::uint32 round_id = source_info.round; + td::uint32 round_id = source_info.priority.round; if (round_id >= last_known_round_id_) { last_known_round_id_ = round_id + 1; }