Skip to content

Commit

Permalink
feat: walkback channel timeout in sad path
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Aug 13, 2024
1 parent 19123e3 commit b7ad611
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 28 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions bin/client/src/kona.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ fn main() -> Result<()> {
let mut driver = DerivationDriver::new(
boot.as_ref(),
oracle.as_ref(),
beacon,
l1_provider,
beacon.clone(),
l1_provider.clone(),
l2_provider.clone(),
)
.await?;
Expand Down
38 changes: 34 additions & 4 deletions bin/client/src/l1/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ where
l2_safe_head_header: Sealed<Header>,
/// The inner pipeline.
pipeline: OraclePipeline<O, B>,
/// A secondary pipeline starting at the channel timeout walkback.
walkback_pipeline: Option<OraclePipeline<O, B>>,
}

impl<O, B> DerivationDriver<O, B>
Expand Down Expand Up @@ -124,17 +126,36 @@ where
l2_chain_provider.clone(),
chain_provider.clone(),
);
let dap = EthereumDataSource::new(chain_provider.clone(), blob_provider, &cfg);
let dap = EthereumDataSource::new(chain_provider.clone(), blob_provider.clone(), &cfg);
let pipeline = PipelineBuilder::new()
.rollup_config(cfg)
.dap_source(dap)
.l2_chain_provider(l2_chain_provider)
.chain_provider(chain_provider)
.l2_chain_provider(l2_chain_provider.clone())
.chain_provider(chain_provider.clone())
.builder(attributes)
.origin(l1_origin)
.build();

Ok(Self { l2_safe_head, l2_safe_head_header, pipeline })
let origin_num = l2_safe_head.l1_origin.number -
boot_info.rollup_config.channel_timeout(l2_safe_head.block_info.timestamp);
let wb_l1_origin = chain_provider.block_info_by_number(origin_num).await?;
let cfg = Arc::new(boot_info.rollup_config.clone());
let walkback_pipeline = Some(
PipelineBuilder::new()
.rollup_config(cfg.clone())
.dap_source(EthereumDataSource::new(
chain_provider.clone(),
blob_provider.clone(),
&cfg,
))
.l2_chain_provider(l2_chain_provider.clone())
.chain_provider(chain_provider.clone())
.builder(StatefulAttributesBuilder::new(cfg, l2_chain_provider, chain_provider))
.origin(wb_l1_origin)
.build(),
);

Ok(Self { l2_safe_head, l2_safe_head_header, pipeline, walkback_pipeline })
}

/// Produces the disputed [L2AttributesWithParent] payload, directly after the starting L2
Expand All @@ -158,6 +179,15 @@ where
StepResult::StepFailed(e) => {
warn!(target: "client_derivation_driver", "Failed to step derivation pipeline: {:?}", e)
}
StepResult::FailedToDecodeBatch => {
if self.walkback_pipeline.is_none() {
return Err(anyhow!("Failed to decode batch"))
}
self.pipeline = self
.walkback_pipeline
.take()
.ok_or_else(|| anyhow!("Missing walkback pipeline"))?;
}
}

attributes = self.pipeline.next();
Expand Down
9 changes: 9 additions & 0 deletions crates/derive/src/pipeline/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ where
}
StepResult::AdvancedOrigin
}
Err(StageError::FailedToDecodeBatch) => {
// Bubble up the batch decoding error so that the client
// may attempt to re-run derivation starting `channel_timeout`
// L1 blocks before the l1 origin. This follows consensus as
// opposed to batcher policy which attempts to open and
// close a channel within a single L1 block.
trace!(target: "pipeline", "Pipeline failed to decode batch");
StepResult::FailedToDecodeBatch
}
Err(err) => {
warn!(target: "pipeline", "Attributes queue step failed: {:?}", err);
StepResult::StepFailed(err)
Expand Down
5 changes: 3 additions & 2 deletions crates/derive/src/stages/batch_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ where
Err(StageError::Eof) => out_of_data = true,
Err(e) => {
crate::timer!(DISCARD, timer);
// Bubbles up any other errors like batches failing to be decoded.
return Err(e);
}
}
Expand Down Expand Up @@ -506,7 +507,7 @@ mod tests {
let mut reader = new_batch_reader();
let cfg = Arc::new(RollupConfig::default());
let mut batch_vec: Vec<StageResult<Batch>> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
while let Ok(batch) = reader.next_batch(cfg.as_ref()) {
batch_vec.push(Ok(batch));
}
let mut mock = MockBatchQueueProvider::new(batch_vec);
Expand Down Expand Up @@ -546,7 +547,7 @@ mod tests {
let mut batch_vec: Vec<StageResult<Batch>> = vec![];
let mut batch_txs: Vec<Bytes> = vec![];
let mut second_batch_txs: Vec<Bytes> = vec![];
while let Some(batch) = reader.next_batch(cfg.as_ref()) {
while let Ok(batch) = reader.next_batch(cfg.as_ref()) {
if let Batch::Span(span) = &batch {
let bys = span.batches[0]
.transactions
Expand Down
58 changes: 40 additions & 18 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,18 +96,18 @@ where
crate::timer!(DISCARD, timer);
return Err(e);
}
match self
.next_batch
.as_mut()
.expect("Cannot be None")
.next_batch(self.cfg.as_ref())
.ok_or(StageError::NotEnoughData)
{
match self.next_batch.as_mut().expect("Cannot be None").next_batch(self.cfg.as_ref()) {
Ok(batch) => Ok(batch),
Err(e) => {
Err(BatchReaderError::FailedToDecodeBatch) => {
// TODO: here we want to bubble up a stage error
self.next_channel();
crate::timer!(DISCARD, timer);
Err(e)
Err(StageError::FailedToDecodeBatch)
}
Err(_) => {
self.next_channel();
crate::timer!(DISCARD, timer);
Err(StageError::NotEnoughData)
}
}
}
Expand Down Expand Up @@ -149,9 +149,28 @@ pub(crate) struct BatchReader {
cursor: usize,
}

/// An error type for reading a [Batch] from a [crate::types::Channel].
#[derive(Debug)]
pub(crate) enum BatchReaderError {
/// The batch data is not long enough.
NotEnoughData,
/// The compression type is not supported.
UnsupportedCompressionType,
/// Batch decoding failed.
FailedToDecodeBatch,
/// Brotli compression was used before the Fjord hardfork.
BrotliUsedBeforeFjord,
/// Brotli decompression error.
BrotliDecompressionError,
/// Zlib decompression error.
ZlibDecompressionError,
/// Failed to decode channel data as bytes.
FailedToDecodeChannelData,
}

impl BatchReader {
/// Pulls out the next batch from the reader.
pub(crate) fn next_batch(&mut self, cfg: &RollupConfig) -> Option<Batch> {
pub(crate) fn next_batch(&mut self, cfg: &RollupConfig) -> Result<Batch, BatchReaderError> {
// If the data is not already decompressed, decompress it.
let mut brotli_used = false;

Expand All @@ -162,7 +181,7 @@ impl BatchReader {
// Peek at the data to determine the compression type.
if data.is_empty() {
warn!(target: "batch-reader", "Data is too short to determine compression type, skipping batch");
return None;
return Err(BatchReaderError::NotEnoughData);
}

#[cfg(feature = "metrics")]
Expand All @@ -174,38 +193,41 @@ impl BatchReader {
if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD ||
(compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD
{
self.decompressed = decompress_to_vec_zlib(&data).ok()?;
self.decompressed = decompress_to_vec_zlib(&data)
.map_err(|_| BatchReaderError::ZlibDecompressionError)?;
} else if compression_type == CHANNEL_VERSION_BROTLI {
brotli_used = true;
self.decompressed = decompress_brotli(&data[1..]).ok()?;
self.decompressed = decompress_brotli(&data[1..])
.map_err(|_| BatchReaderError::BrotliDecompressionError)?;
} else {
error!(target: "batch-reader", "Unsupported compression type: {:x}, skipping batch", compression_type);
crate::inc!(BATCH_READER_ERRORS, &["unsupported_compression_type"]);
return None;
return Err(BatchReaderError::UnsupportedCompressionType);
}
}

// Decompress and RLP decode the batch data, before finally decoding the batch itself.
let decompressed_reader = &mut self.decompressed.as_slice()[self.cursor..].as_ref();
let bytes = Bytes::decode(decompressed_reader).ok()?;
let bytes = Bytes::decode(decompressed_reader)
.map_err(|_| BatchReaderError::FailedToDecodeChannelData)?;
crate::set!(BATCH_COMPRESSION_RATIO, (raw_len as i64) * 100 / bytes.len() as i64);
let Ok(batch) = Batch::decode(&mut bytes.as_ref(), cfg) else {
error!(target: "batch-reader", "Failed to decode batch, skipping batch");
crate::inc!(BATCH_READER_ERRORS, &["failed_to_decode_batch"]);
return None;
return Err(BatchReaderError::FailedToDecodeBatch);
};

// Confirm that brotli decompression was performed *after* the Fjord hardfork.
if brotli_used && !cfg.is_fjord_active(batch.timestamp()) {
warn!(target: "batch-reader", "Brotli compression used before Fjord hardfork, skipping batch");
crate::inc!(BATCH_READER_ERRORS, &["brotli_used_before_fjord"]);
return None;
return Err(BatchReaderError::BrotliUsedBeforeFjord);
}

// Advance the cursor on the reader.
self.cursor = self.decompressed.len() - decompressed_reader.len();

Some(batch)
Ok(batch)
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/derive/src/traits/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub enum StepResult {
OriginAdvanceErr(StageError),
/// Step failed.
StepFailed(StageError),
/// Failed to decode a batch.
FailedToDecodeBatch,
}

/// This trait defines the interface for interacting with the derivation pipeline.
Expand Down
4 changes: 4 additions & 0 deletions crates/derive/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub enum StageError {
ChannelNotFound,
/// Missing L1 origin.
MissingOrigin,
/// Failed to decode a batch from channel data.
FailedToDecodeBatch,
/// Failed to build the [L2PayloadAttributes] for the next batch.
///
/// [L2PayloadAttributes]: super::L2PayloadAttributes
Expand Down Expand Up @@ -65,6 +67,7 @@ impl PartialEq<StageError> for StageError {
matches!(
(self, other),
(StageError::Eof, StageError::Eof) |
(StageError::FailedToDecodeBatch, StageError::FailedToDecodeBatch) |
(StageError::Temporary(_), StageError::Temporary(_)) |
(StageError::Critical(_), StageError::Critical(_)) |
(StageError::NotEnoughData, StageError::NotEnoughData) |
Expand Down Expand Up @@ -99,6 +102,7 @@ impl Display for StageError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
StageError::Eof => write!(f, "End of file"),
StageError::FailedToDecodeBatch => write!(f, "Failed to decode batch"),
StageError::Temporary(e) => write!(f, "Temporary error: {}", e),
StageError::Critical(e) => write!(f, "Critical error: {}", e),
StageError::NotEnoughData => write!(f, "Not enough data"),
Expand Down
2 changes: 1 addition & 1 deletion examples/trusted-sync/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "trusted-sync"
version = "0.1.0"
version = "0.1.1"
publish = false
description = "Derives and validates payloads using a trusted source"
edition.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions examples/trusted-sync/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ async fn sync(cli: cli::Cli) -> Result<()> {
error!(target: "loop", "Error stepping derivation pipeline: {:?}", e);
}
},
StepResult::FailedToDecodeBatch => {
metrics::PIPELINE_STEPS.with_label_values(&["failed_to_decode_batch"]).inc();
warn!(target: "loop", "Failed to decode batch");
}
}

// Peek at the next prepared attributes and validate them.
Expand Down

0 comments on commit b7ad611

Please sign in to comment.