Skip to content

Commit

Permalink
test: multiple arrow to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jan 29, 2025
1 parent 0298ecd commit 8d30f51
Showing 1 changed file with 69 additions and 1 deletion.
70 changes: 69 additions & 1 deletion src/staging/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,9 @@ impl Streams {

#[cfg(test)]
mod tests {
use chrono::NaiveDate;
use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray};
use arrow_schema::{DataType, Field, TimeUnit};
use chrono::{NaiveDate, TimeDelta};
use temp_dir::TempDir;

use super::*;
Expand Down Expand Up @@ -550,4 +552,70 @@ mod tests {
assert_eq!(storage_size_parquet, 0);
Ok(())
}

#[test]
fn convert_multiple_arrow_files_to_parquet() {
let temp_dir = TempDir::new().unwrap();
let stream_name = "test_stream";
let options = Options {
local_staging_path: temp_dir.path().to_path_buf(),
row_group_size: 1048576,
..Default::default()
};
let staging: Arc<Stream<'_>> = Stream::new(&options, stream_name);

// Create test arrow files
let schema = Schema::new(vec![
Field::new(
DEFAULT_TIMESTAMP_KEY,
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Utf8, false),
]);

for i in 0..3 {
let past = Utc::now()
.checked_sub_signed(TimeDelta::minutes(10 - i))
.unwrap()
.naive_utc();
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])),
Arc::new(Int32Array::from(vec![1, 2, 3])),
Arc::new(StringArray::from(vec!["a", "b", "c"])),
],
)
.unwrap();
staging
.push(
"abc",
&batch,
past,
&HashMap::new(),
StreamType::UserDefined,
)
.unwrap();
staging.flush();
}
// verify the arrow files exist in staging
assert_eq!(staging.arrow_files().len(), 3);
drop(staging);

// Start with a fresh staging
let staging: Arc<Stream<'_>> = Stream::new(&options, stream_name);
let result = staging
.convert_disk_files_to_parquet(None, None, true)
.unwrap();

assert!(result.is_some());
let result_schema = result.unwrap();
assert_eq!(result_schema.fields().len(), 3);

// Verify parquet files were created and the arrow files deleted
assert_eq!(staging.parquet_files().len(), 3);
assert_eq!(staging.arrow_files().len(), 0);
}
}

0 comments on commit 8d30f51

Please sign in to comment.