From 83a386df0bca3d1f2a0a2b058827f52c04ed15c6 Mon Sep 17 00:00:00 2001 From: clabby Date: Thu, 19 Sep 2024 01:19:21 -0400 Subject: [PATCH 1/2] fix(client): Channel reader error handling --- bin/client/src/l1/driver.rs | 2 +- crates/derive/src/stages/channel_reader.rs | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index 9b37bfc85..31bc4e72e 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -171,7 +171,7 @@ where // Break the loop unless the error signifies that there is not enough data to // complete the current step. In this case, we retry the step to see if other // stages can make progress. - if !matches!(e, StageError::NotEnoughData) { + if !matches!(e, StageError::NotEnoughData | StageError::Temporary(_)) { break; } } diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index ccde829a6..4831c0861 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -6,6 +6,7 @@ use crate::{ stages::{decompress_brotli, BatchQueueProvider}, traits::{OriginAdvancer, OriginProvider, ResettableStage}, }; +use anyhow::anyhow; use op_alloy_genesis::{RollupConfig, SystemConfig}; use op_alloy_protocol::BlockInfo; @@ -63,7 +64,8 @@ where /// Creates the batch reader from available channel data. async fn set_batch_reader(&mut self) -> StageResult<()> { if self.next_batch.is_none() { - let channel = self.prev.next_data().await?.ok_or(StageError::NoChannel)?; + let channel = + self.prev.next_data().await?.ok_or(StageError::Temporary(anyhow!("No channel")))?; self.next_batch = Some(BatchReader::from(&channel[..])); } Ok(()) @@ -174,8 +176,8 @@ impl BatchReader { } let compression_type = data[0]; - if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD || - (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD + if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD + || (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD { self.decompressed = decompress_to_vec_zlib(&data).ok()?; } else if compression_type == CHANNEL_VERSION_BROTLI { From 9a192865f11b40f38667a57c37ad76920d73130e Mon Sep 17 00:00:00 2001 From: clabby Date: Thu, 19 Sep 2024 01:22:25 -0400 Subject: [PATCH 2/2] fmt --- crates/derive/src/stages/channel_reader.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 4831c0861..c6189b4e0 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -176,8 +176,8 @@ impl BatchReader { } let compression_type = data[0]; - if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD - || (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD + if (compression_type & 0x0F) == ZLIB_DEFLATE_COMPRESSION_METHOD || + (compression_type & 0x0F) == ZLIB_RESERVED_COMPRESSION_METHOD { self.decompressed = decompress_to_vec_zlib(&data).ok()?; } else if compression_type == CHANNEL_VERSION_BROTLI { @@ -246,7 +246,7 @@ mod test { async fn test_next_batch_batch_reader_no_data() { let mock = MockChannelReaderProvider::new(vec![Ok(None)]); let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); - assert_eq!(reader.next_batch().await, Err(StageError::NoChannel)); + assert!(matches!(reader.next_batch().await.unwrap_err(), StageError::Temporary(_))); assert!(reader.next_batch.is_none()); }