From 25f804a111c8c2fd127863c8f3db6b12d0d668f8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 13 Feb 2025 11:50:17 +1100 Subject: [PATCH] Fix light client plumbing in beacon processor (#6993) Our Holesky nodes running with the light client enabled were logging messages about full queues: > Feb 12 22:09:28.949 ERRO Work queue is full queue: unknown_light_client_optimistic_update, queue_len: 128, msg: the system has insufficient resources for load, service: bproc I thought this might be genuine overload, but it turns out this queue was never being read from! - [x] Rename light-client related queues in the beacon processor for clarity. - [x] Ensure all light-client related queues are being popped from. --- beacon_node/beacon_processor/src/lib.rs | 73 +++++++++++++++---------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 2743f93bb3..a8960d47c9 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -109,8 +109,6 @@ pub struct BeaconProcessorQueueLengths { gossip_voluntary_exit_queue: usize, gossip_proposer_slashing_queue: usize, gossip_attester_slashing_queue: usize, - finality_update_queue: usize, - optimistic_update_queue: usize, unknown_light_client_update_queue: usize, unknown_block_sampling_request_queue: usize, rpc_block_queue: usize, @@ -132,9 +130,11 @@ pub struct BeaconProcessorQueueLengths { dcbroots_queue: usize, dcbrange_queue: usize, gossip_bls_to_execution_change_queue: usize, + lc_gossip_finality_update_queue: usize, + lc_gossip_optimistic_update_queue: usize, lc_bootstrap_queue: usize, - lc_optimistic_update_queue: usize, - lc_finality_update_queue: usize, + lc_rpc_optimistic_update_queue: usize, + lc_rpc_finality_update_queue: usize, lc_update_range_queue: usize, api_request_p0_queue: usize, api_request_p1_queue: usize, @@ -175,15 +175,13 @@ impl BeaconProcessorQueueLengths { gossip_voluntary_exit_queue: 4096, gossip_proposer_slashing_queue: 4096, gossip_attester_slashing_queue: 4096, - finality_update_queue: 1024, - optimistic_update_queue: 1024, - unknown_block_sampling_request_queue: 16384, unknown_light_client_update_queue: 128, rpc_block_queue: 1024, rpc_blob_queue: 1024, // TODO(das): Placeholder values rpc_custody_column_queue: 1000, rpc_verify_data_column_queue: 1000, + unknown_block_sampling_request_queue: 16384, sampling_result_queue: 1000, chain_segment_queue: 64, backfill_chain_segment: 64, @@ -200,9 +198,11 @@ impl BeaconProcessorQueueLengths { dcbroots_queue: 1024, dcbrange_queue: 1024, gossip_bls_to_execution_change_queue: 16384, + lc_gossip_finality_update_queue: 1024, + lc_gossip_optimistic_update_queue: 1024, lc_bootstrap_queue: 1024, - lc_optimistic_update_queue: 512, - lc_finality_update_queue: 512, + lc_rpc_optimistic_update_queue: 512, + lc_rpc_finality_update_queue: 512, lc_update_range_queue: 512, api_request_p0_queue: 1024, api_request_p1_queue: 1024, @@ -884,21 +884,16 @@ impl BeaconProcessor { let mut gossip_attester_slashing_queue = FifoQueue::new(queue_lengths.gossip_attester_slashing_queue); - // Using a FIFO queue for light client updates to maintain sequence order. - let mut finality_update_queue = FifoQueue::new(queue_lengths.finality_update_queue); - let mut optimistic_update_queue = FifoQueue::new(queue_lengths.optimistic_update_queue); - let mut unknown_light_client_update_queue = - FifoQueue::new(queue_lengths.unknown_light_client_update_queue); - let mut unknown_block_sampling_request_queue = - FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); - // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue); let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue); let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue); let mut rpc_verify_data_column_queue = FifoQueue::new(queue_lengths.rpc_verify_data_column_queue); + // TODO(das): the sampling_request_queue is never read let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue); + let mut unknown_block_sampling_request_queue = + FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue); let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue); let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment); let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue); @@ -917,10 +912,18 @@ impl BeaconProcessor { let mut gossip_bls_to_execution_change_queue = FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); + // Using FIFO queues for light client updates to maintain sequence order. + let mut lc_gossip_finality_update_queue = + FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue); + let mut lc_gossip_optimistic_update_queue = + FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue); + let mut unknown_light_client_update_queue = + FifoQueue::new(queue_lengths.unknown_light_client_update_queue); let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue); - let mut lc_optimistic_update_queue = - FifoQueue::new(queue_lengths.lc_optimistic_update_queue); - let mut lc_finality_update_queue = FifoQueue::new(queue_lengths.lc_finality_update_queue); + let mut lc_rpc_optimistic_update_queue = + FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue); + let mut lc_rpc_finality_update_queue = + FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue); let mut lc_update_range_queue = FifoQueue::new(queue_lengths.lc_update_range_queue); let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue); @@ -1254,11 +1257,19 @@ impl BeaconProcessor { } else if let Some(item) = backfill_chain_segment.pop() { Some(item) // Handle light client requests. + } else if let Some(item) = lc_gossip_finality_update_queue.pop() { + Some(item) + } else if let Some(item) = lc_gossip_optimistic_update_queue.pop() { + Some(item) + } else if let Some(item) = unknown_light_client_update_queue.pop() { + Some(item) } else if let Some(item) = lc_bootstrap_queue.pop() { Some(item) - } else if let Some(item) = lc_optimistic_update_queue.pop() { + } else if let Some(item) = lc_rpc_optimistic_update_queue.pop() { Some(item) - } else if let Some(item) = lc_finality_update_queue.pop() { + } else if let Some(item) = lc_rpc_finality_update_queue.pop() { + Some(item) + } else if let Some(item) = lc_update_range_queue.pop() { Some(item) // This statement should always be the final else statement. } else { @@ -1362,10 +1373,10 @@ impl BeaconProcessor { sync_contribution_queue.push(work) } Work::GossipLightClientFinalityUpdate { .. } => { - finality_update_queue.push(work, work_id, &self.log) + lc_gossip_finality_update_queue.push(work, work_id, &self.log) } Work::GossipLightClientOptimisticUpdate { .. } => { - optimistic_update_queue.push(work, work_id, &self.log) + lc_gossip_optimistic_update_queue.push(work, work_id, &self.log) } Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => { rpc_block_queue.push(work, work_id, &self.log) @@ -1400,10 +1411,10 @@ impl BeaconProcessor { lc_bootstrap_queue.push(work, work_id, &self.log) } Work::LightClientOptimisticUpdateRequest { .. } => { - lc_optimistic_update_queue.push(work, work_id, &self.log) + lc_rpc_optimistic_update_queue.push(work, work_id, &self.log) } Work::LightClientFinalityUpdateRequest { .. } => { - lc_finality_update_queue.push(work, work_id, &self.log) + lc_rpc_finality_update_queue.push(work, work_id, &self.log) } Work::LightClientUpdatesByRangeRequest { .. } => { lc_update_range_queue.push(work, work_id, &self.log) @@ -1472,9 +1483,11 @@ impl BeaconProcessor { WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(), WorkType::GossipSyncSignature => sync_message_queue.len(), WorkType::GossipSyncContribution => sync_contribution_queue.len(), - WorkType::GossipLightClientFinalityUpdate => finality_update_queue.len(), + WorkType::GossipLightClientFinalityUpdate => { + lc_gossip_finality_update_queue.len() + } WorkType::GossipLightClientOptimisticUpdate => { - optimistic_update_queue.len() + lc_gossip_optimistic_update_queue.len() } WorkType::RpcBlock => rpc_block_queue.len(), WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(), @@ -1495,10 +1508,10 @@ impl BeaconProcessor { } WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(), WorkType::LightClientOptimisticUpdateRequest => { - lc_optimistic_update_queue.len() + lc_rpc_optimistic_update_queue.len() } WorkType::LightClientFinalityUpdateRequest => { - lc_finality_update_queue.len() + lc_rpc_finality_update_queue.len() } WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(), WorkType::ApiRequestP0 => api_request_p0_queue.len(),