Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import gossip data column into data availability checker #6197

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2959,9 +2959,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
let Ok(block_root) = data_columns
let Ok((slot, block_root)) = data_columns
.iter()
.map(|c| c.block_root())
.map(|c| (c.slot(), c.block_root()))
.unique()
.exactly_one()
else {
Expand All @@ -2981,7 +2981,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

let r = self
.check_gossip_data_columns_availability_and_import(data_columns)
.check_gossip_data_columns_availability_and_import(slot, block_root, data_columns)
.await;
self.remove_notified_custody_columns(&block_root, r)
}
Expand Down Expand Up @@ -3298,6 +3298,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// if so, otherwise caches the data column in the data availability checker.
async fn check_gossip_data_columns_availability_and_import(
self: &Arc<Self>,
slot: Slot,
block_root: Hash256,
data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
if let Some(slasher) = self.slasher.as_ref() {
Expand All @@ -3306,15 +3308,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

let Ok(slot) = data_columns.iter().map(|c| c.slot()).unique().exactly_one() else {
return Err(BlockError::InternalError(
"Columns for the same block should have matching slot".to_string(),
));
};

let availability = self
.data_availability_checker
.put_gossip_data_columns(data_columns)?;
let availability = self.data_availability_checker.put_gossip_data_columns(
slot,
block_root,
data_columns,
)?;

self.process_availability(slot, availability).await
}
Expand Down Expand Up @@ -3629,7 +3627,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs) = signed_block.deconstruct();
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let block = signed_block.message();
ops.extend(
confirmed_state_roots
Expand All @@ -3650,6 +3648,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(_data_columns) = data_columns {
// TODO(das): depends on https://github.com/sigp/lighthouse/pull/6073
// if !data_columns.is_empty() {
// debug!(
// self.log, "Writing data_columns to store";
// "block_root" => %block_root,
// "count" => data_columns.len(),
// );
// ops.push(StoreOp::PutDataColumns(block_root, data_columns));
// }
}

let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,8 @@ impl<E: EthSpec> AsBlock<E> for AvailableBlock<E> {
}

fn into_rpc_block(self) -> RpcBlock<E> {
let (block_root, block, blobs_opt) = self.deconstruct();
// TODO(das): rpc data columns to be merged from `das` branch
let (block_root, block, blobs_opt, _data_columns_opt) = self.deconstruct();
// Circumvent the constructor here, because an Available block will have already had
// consistency checks performed.
let inner = match blobs_opt {
Expand Down
33 changes: 27 additions & 6 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
use types::{
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
Slot,
};

mod error;
mod overflow_lru_cache;
mod state_lru_cache;

use crate::data_column_verification::GossipVerifiedDataColumn;
use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn};
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
use types::non_zero_usize::new_non_zero_usize;

Expand Down Expand Up @@ -191,10 +194,18 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {

pub fn put_gossip_data_columns(
&self,
_gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
slot: Slot,
block_root: Hash256,
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// TODO(das) to be implemented
Err(AvailabilityCheckError::Unexpected)
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let custody_columns = gossip_data_columns
.into_iter()
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
.collect::<Vec<_>>();

self.availability_cache
.put_kzg_verified_data_columns(block_root, epoch, custody_columns)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
Expand Down Expand Up @@ -231,6 +242,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: None,
data_columns: None,
blobs_available_timestamp: None,
}))
}
Expand All @@ -251,6 +263,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: verified_blobs,
data_columns: None,
blobs_available_timestamp: None,
}))
}
Expand Down Expand Up @@ -297,6 +310,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: None,
data_columns: None,
blobs_available_timestamp: None,
}))
}
Expand All @@ -312,6 +326,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
block_root,
block,
blobs: verified_blobs,
data_columns: None,
blobs_available_timestamp: None,
}))
}
Expand Down Expand Up @@ -477,6 +492,7 @@ pub struct AvailableBlock<E: EthSpec> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
/// Timestamp at which this block first became available (UNIX timestamp, time since 1970).
blobs_available_timestamp: Option<Duration>,
}
Expand All @@ -486,11 +502,13 @@ impl<E: EthSpec> AvailableBlock<E> {
block_root: Hash256,
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
) -> Self {
Self {
block_root,
block,
blobs,
data_columns,
blobs_available_timestamp: None,
}
}
Expand All @@ -510,20 +528,23 @@ impl<E: EthSpec> AvailableBlock<E> {
self.blobs_available_timestamp
}

#[allow(clippy::type_complexity)]
pub fn deconstruct(
self,
) -> (
Hash256,
Arc<SignedBeaconBlock<E>>,
Option<BlobSidecarList<E>>,
Option<DataColumnSidecarList<E>>,
) {
let AvailableBlock {
block_root,
block,
blobs,
data_columns,
blobs_available_timestamp: _,
} = self;
(block_root, block, blobs)
(block_root, block, blobs, data_columns)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@ impl<E: EthSpec> PendingComponents<E> {
///
/// WARNING: This function can potentially take a lot of time if the state needs to be
/// reconstructed from disk. Ensure you are not holding any write locks while calling this.
pub fn make_available<R>(self, recover: R) -> Result<Availability<E>, AvailabilityCheckError>
pub fn make_available<R>(
self,
block_import_requirement: BlockImportRequirement,
recover: R,
) -> Result<Availability<E>, AvailabilityCheckError>
where
R: FnOnce(
DietAvailabilityPendingExecutedBlock<E>,
Expand All @@ -226,7 +230,7 @@ impl<E: EthSpec> PendingComponents<E> {
let Self {
block_root,
verified_blobs,
verified_data_columns: _,
verified_data_columns,
executed_block,
} = self;

Expand All @@ -239,17 +243,29 @@ impl<E: EthSpec> PendingComponents<E> {
let Some(diet_executed_block) = executed_block else {
return Err(AvailabilityCheckError::Unexpected);
};
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);

let (blobs, data_columns) = match block_import_requirement {
BlockImportRequirement::AllBlobs => {
let num_blobs_expected = diet_executed_block.num_blobs_expected();
let Some(verified_blobs) = verified_blobs
.into_iter()
.cloned()
.map(|b| b.map(|b| b.to_blob()))
.take(num_blobs_expected)
.collect::<Option<Vec<_>>>()
else {
return Err(AvailabilityCheckError::Unexpected);
};
(Some(VariableList::new(verified_blobs)?), None)
}
BlockImportRequirement::CustodyColumns(_) => {
let verified_data_columns = verified_data_columns
.into_iter()
.map(|d| d.into_inner())
.collect();
(None, Some(verified_data_columns))
}
};
let verified_blobs = VariableList::new(verified_blobs)?;

let executed_block = recover(diet_executed_block)?;

Expand All @@ -262,7 +278,8 @@ impl<E: EthSpec> PendingComponents<E> {
let available_block = AvailableBlock {
block_root,
block,
blobs: Some(verified_blobs),
blobs,
data_columns,
blobs_available_timestamp,
};
Ok(Availability::Available(Box::new(
Expand Down Expand Up @@ -404,7 +421,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand All @@ -413,7 +430,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
}
}

// TODO(das): gossip and rpc code paths to be implemented.
// TODO(das): rpc code paths to be implemented.
#[allow(dead_code)]
pub fn put_kzg_verified_data_columns<
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
Expand All @@ -439,7 +456,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down Expand Up @@ -478,7 +495,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
write_lock.put(block_root, pending_components.clone());
// No need to hold the write lock anymore
drop(write_lock);
pending_components.make_available(|diet_block| {
pending_components.make_available(block_import_requirement, |diet_block| {
self.state_cache.recover_pending_executed_block(diet_block)
})
} else {
Expand Down
19 changes: 19 additions & 0 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ impl<T: BeaconChainTypes> GossipVerifiedDataColumn<T> {
pub fn signed_block_header(&self) -> SignedBeaconBlockHeader {
self.data_column.data.signed_block_header.clone()
}

pub fn into_inner(self) -> KzgVerifiedDataColumn<T::EthSpec> {
self.data_column
}
}

/// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification.
Expand All @@ -204,6 +208,9 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
pub fn new(data_column: Arc<DataColumnSidecar<E>>, kzg: &Kzg) -> Result<Self, KzgError> {
verify_kzg_for_data_column(data_column, kzg)
}
pub fn to_data_column(self) -> Arc<DataColumnSidecar<E>> {
self.data
}
pub fn as_data_column(&self) -> &DataColumnSidecar<E> {
&self.data
}
Expand All @@ -226,9 +233,21 @@ pub struct KzgVerifiedCustodyDataColumn<E: EthSpec> {
}

impl<E: EthSpec> KzgVerifiedCustodyDataColumn<E> {
/// Mark a column as custody column. Caller must ensure that our current custody requirements
/// include this column
pub fn from_asserted_custody(kzg_verified: KzgVerifiedDataColumn<E>) -> Self {
Self {
data: kzg_verified.to_data_column(),
}
}

pub fn index(&self) -> ColumnIndex {
self.data.index
}

pub fn into_inner(self) -> Arc<DataColumnSidecar<E>> {
self.data
}
}

/// Complete kzg verification for a `DataColumnSidecar`.
Expand Down
Loading