diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 9d7cd6052..5866b69a5 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -264,6 +264,15 @@ mod test { data.into() } + #[tokio::test] + async fn test_flush_channel_reader() { + let mock = MockChannelReaderProvider::new(vec![Ok(Some(new_compressed_batch_data()))]); + let mut reader = ChannelReader::new(mock, Arc::new(RollupConfig::default())); + reader.next_batch = Some(BatchReader::from(new_compressed_batch_data())); + reader.flush_channel().await.unwrap(); + assert!(reader.next_batch.is_none()); + } + #[tokio::test] async fn test_reset_channel_reader() { let mock = MockChannelReaderProvider::new(vec![Ok(None)]);