diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 308cc4f850..2121c321fc 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -1,31 +1,29 @@ #![allow(clippy::cast_possible_truncation)] -use std::collections::BTreeSet; use std::iter; +use std::ops::Deref; +use std::pin::pin; use std::sync::Arc; -use bytes::Bytes; use futures::StreamExt; use futures_util::TryStreamExt; use itertools::Itertools; use vortex_array::accessor::ArrayAccessor; use vortex_array::array::{ChunkedArray, ListArray, PrimitiveArray, StructArray, VarBinArray}; use vortex_array::compute::scalar_at; +use vortex_array::stream::ArrayStreamExt; use vortex_array::validity::Validity; use vortex_array::variants::{PrimitiveArrayTrait, StructArrayTrait}; -use vortex_array::{ArrayDType, ArrayData, ArrayLen, IntoArrayData, IntoArrayVariant, ToArrayData}; -use vortex_buffer::{buffer, Buffer}; +use vortex_array::{ + ArrayDType, ArrayData, ArrayLen, Context, IntoArrayData, IntoArrayVariant, ToArrayData, +}; +use vortex_buffer::{buffer, Buffer, ByteBufferMut}; use vortex_dtype::PType::I32; use vortex_dtype::{DType, Nullability, PType, StructDType}; -use vortex_error::{vortex_panic, VortexResult}; -use vortex_expr::{col, lit, BinaryExpr, Operator, RowFilter}; -use vortex_io::VortexReadAt; - -use crate::builder::initial_read::read_initial_bytes; -use crate::write::VortexFileWriter; -use crate::{ - LayoutDeserializer, LayoutPath, Projection, Scan, VortexReadBuilder, V1_FOOTER_FBS_SIZE, - VERSION, -}; +use vortex_error::vortex_panic; +use vortex_expr::{and, eq, get_item, gt, gt_eq, ident, lit, lt, lt_eq, or, select}; + +use crate::v2::{Scan, VortexOpenOptions, VortexWriteOptions}; +use crate::{V1_FOOTER_FBS_SIZE, VERSION}; #[test] fn test_eof_values() { @@ -51,16 +49,17 @@ async fn test_read_simple() { .into_array(); let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(st.into_array()).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array().into_array_stream()) + .await + .unwrap(); - let mut stream = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .build() + let mut stream = pin!(VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await .unwrap() - .into_stream(); + .scan(Scan::all()) + .unwrap()); let mut batch_count = 0; let mut row_count = 0; @@ -106,17 +105,13 @@ async fn test_read_simple_with_spawn() { let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers), ("lists", lists)]) .unwrap(); - let buf = Vec::new(); - - let written = tokio::spawn(async move { - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(st.into_array()).await.unwrap(); - Bytes::from(writer.finalize().await.unwrap()) - }) - .await - .unwrap(); - assert!(!written.is_empty()); + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array().into_array_stream()) + .await + .unwrap(); + + assert!(!buf.is_empty()); } #[tokio::test] @@ -137,32 +132,18 @@ async fn test_splits() { .into_array(); let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); - let len = st.len(); - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(st.into_array()).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - let initial_read = read_initial_bytes(&written, written.len() as u64) + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array().into_array_stream()) .await .unwrap(); - let layout_serde = LayoutDeserializer::default(); - let dtype = Arc::new(initial_read.dtype()); - - let layout_reader = layout_serde - .read_layout( - LayoutPath::default(), - initial_read.fb_layout(), - Scan::empty(), - dtype, - ) + let file = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) + .await .unwrap(); - let mut splits = BTreeSet::new(); - layout_reader.add_splits(0, &mut splits).unwrap(); - splits.insert(len); - assert_eq!(splits, BTreeSet::from([0, 3, 5, 6, 8])); + assert_eq!(file.splits.deref(), &[0u64..3, 3..5, 5..6, 6..8]); } #[tokio::test] @@ -185,50 +166,20 @@ async fn test_read_projection() { let numbers_dtype = numbers.dtype().clone(); let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(st.into_array()).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - - let array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_projection(Projection::new(["strings".into()])) - .build() - .await - .unwrap() - .into_stream() - .read_all() - .await - .unwrap(); - - assert_eq!( - array.dtype(), - &DType::Struct( - StructDType::new(vec!["strings".into()].into(), vec![strings_dtype.clone()]), - Nullability::NonNullable, - ) - ); - let actual = array - .into_struct() - .unwrap() - .maybe_null_field_by_idx(0) - .unwrap() - .into_varbinview() - .unwrap() - .with_iterator(|x| { - x.map(|x| unsafe { String::from_utf8_unchecked(x.unwrap().to_vec()) }) - .collect::>() - }) + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array().into_array_stream()) + .await .unwrap(); - assert_eq!(actual, strings_expected); - let array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_projection(Projection::Flat(vec!["strings".into()])) - .build() + let file = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await + .unwrap(); + let array = file + .scan(Scan::new(select(["strings".into()], ident()))) .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap(); @@ -254,41 +205,10 @@ async fn test_read_projection() { .unwrap(); assert_eq!(actual, strings_expected); - let array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_projection(Projection::new(["numbers".into()])) - .build() - .await + let array = file + .scan(Scan::new(select(["numbers".into()], ident()))) .unwrap() - .into_stream() - .read_all() - .await - .unwrap(); - - assert_eq!( - array.dtype(), - &DType::Struct( - StructDType::new(vec!["numbers".into()].into(), vec![numbers_dtype.clone()]), - Nullability::NonNullable, - ) - ); - - let primitive_array = array - .into_struct() - .unwrap() - .maybe_null_field_by_idx(0) - .unwrap() - .into_primitive() - .unwrap(); - let actual = primitive_array.as_slice::(); - assert_eq!(actual, numbers_expected); - - let array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_projection(Projection::Flat(vec!["numbers".into()])) - .build() - .await - .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap(); @@ -327,16 +247,18 @@ async fn unequal_batches() { .into_array(); let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(st.into_array()).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array().into_array_stream()) + .await + .unwrap(); - let mut stream = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .build() + let mut stream = pin!(VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await .unwrap() - .into_stream(); + .scan(Scan::all()) + .unwrap()); + let mut batch_count = 0; let mut item_count = 0; @@ -388,18 +310,19 @@ async fn write_chunked() { let chunked_st = ChunkedArray::try_new(iter::repeat_n(st, 3).collect(), st_dtype) .unwrap() .into_array(); - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(chunked_st).await.unwrap(); + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), chunked_st.into_array_stream()) + .await + .unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - let mut reader = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .build() + let mut stream = pin!(VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await .unwrap() - .into_stream(); + .scan(Scan::all()) + .unwrap()); let mut array_len: usize = 0; - while let Some(array) = reader.next().await { + while let Some(array) = stream.next().await { array_len += array.unwrap().len(); } assert_eq!(array_len, 48); @@ -423,22 +346,21 @@ async fn filter_string() { ) .unwrap() .into_array(); - let mut writer = VortexFileWriter::new(Vec::new()); - writer = writer.write_array_columns(st).await.unwrap(); - - let written = Bytes::from(writer.finalize().await.unwrap()); - let stream = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .with_row_filter(RowFilter::new(BinaryExpr::new_expr( - col("name"), - Operator::Eq, - lit("Joseph"), - ))) - .build() + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array_stream()) + .await + .unwrap(); + + let result = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await .unwrap() - .into_stream(); + .scan(Scan::all().with_filter(eq(get_item("name", ident()), lit("Joseph")))) + .unwrap() + .try_collect::>() + .await + .unwrap(); - let result = stream.try_collect::>().await.unwrap(); assert_eq!(result.len(), 1); let names = result[0] .as_struct_array() @@ -480,28 +402,28 @@ async fn filter_or() { ) .unwrap() .into_array(); - let mut writer = VortexFileWriter::new(Vec::new()); - writer = writer.write_array_columns(st).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - let mut reader = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .with_row_filter(RowFilter::new(BinaryExpr::new_expr( - BinaryExpr::new_expr(col("name"), Operator::Eq, lit("Angela")), - Operator::Or, - BinaryExpr::new_expr( - BinaryExpr::new_expr(col("age"), Operator::Gte, lit(20)), - Operator::And, - BinaryExpr::new_expr(col("age"), Operator::Lte, lit(30)), + + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array_stream()) + .await + .unwrap(); + + let result = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) + .await + .unwrap() + .scan(Scan::all().with_filter(or( + eq(get_item("name", ident()), lit("Angela")), + and( + gt_eq(get_item("age", ident()), lit(20)), + lt_eq(get_item("age", ident()), lit(30)), ), ))) - .build() - .await .unwrap() - .into_stream(); + .try_collect::>() + .await + .unwrap(); - let mut result = Vec::new(); - while let Some(array) = reader.next().await { - result.push(array.unwrap()); - } assert_eq!(result.len(), 1); let names = result[0] .as_struct_array() @@ -549,24 +471,25 @@ async fn filter_and() { ) .unwrap() .into_array(); - let mut writer = VortexFileWriter::new(Vec::new()); - writer = writer.write_array_columns(st).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - let mut reader = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .with_row_filter(RowFilter::new(BinaryExpr::new_expr( - BinaryExpr::new_expr(col("age"), Operator::Gt, lit(21)), - Operator::And, - BinaryExpr::new_expr(col("age"), Operator::Lte, lit(33)), - ))) - .build() + + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array_stream()) .await + .unwrap(); + + let result = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) + .await + .unwrap() + .scan(Scan::all().with_filter(and( + gt(get_item("age", ident()), lit(21)), + lt_eq(get_item("age", ident()), lit(33)), + ))) .unwrap() - .into_stream(); + .try_collect::>() + .await + .unwrap(); - let mut result = Vec::new(); - while let Some(array) = reader.next().await { - result.push(array.unwrap()); - } assert_eq!(result.len(), 1); let names = result[0] .as_struct_array() @@ -606,21 +529,24 @@ async fn test_with_indices_simple() { .unwrap(); let expected_numbers: Vec = expected_numbers_split.into_iter().flatten().collect(); - let writer = VortexFileWriter::new(Vec::new()) - .write_array_columns(expected_array.into_array()) + let buf = VortexWriteOptions::default() + .write( + ByteBufferMut::empty(), + expected_array.into_array().into_array_stream(), + ) .await .unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - // test no indices - let empty_indices = Buffer::::empty(); - let actual_kept_array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_indices(empty_indices.into_array()) - .build() + let file = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await + .unwrap(); + + // test no indices + let actual_kept_array = file + .scan(Scan::all().with_row_indices(Buffer::::empty())) .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap() .into_struct() @@ -629,16 +555,12 @@ async fn test_with_indices_simple() { assert_eq!(actual_kept_array.len(), 0); // test a few indices - let kept_indices = [0_usize, 3, 99, 100, 101, 399, 400, 401, 499]; - let kept_indices_u16 = Buffer::from_iter(kept_indices.iter().map(|&x| x as u16)).into_array(); + let kept_indices = [0_u64, 3, 99, 100, 101, 399, 400, 401, 499]; - let actual_kept_array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_indices(kept_indices_u16) - .build() - .await + let actual_kept_array = file + .scan(Scan::all().with_row_indices(Buffer::from_iter(kept_indices))) .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap() .into_struct() @@ -649,20 +571,19 @@ async fn test_with_indices_simple() { .into_primitive() .unwrap(); - let expected_kept_numbers: Vec = - kept_indices.iter().map(|&x| expected_numbers[x]).collect(); + let expected_kept_numbers: Vec = kept_indices + .iter() + .map(|&x| expected_numbers[x as usize]) + .collect(); let actual_kept_numbers = actual_kept_numbers_array.as_slice::(); assert_eq!(expected_kept_numbers, actual_kept_numbers); // test all indices - let actual_array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_indices(ArrayData::from((0u32..500).collect::>())) - .build() - .await + let actual_array = file + .scan(Scan::all().with_row_indices((0u64..500).collect::>())) .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap() .into_struct() @@ -695,23 +616,26 @@ async fn test_with_indices_on_two_columns() { .into_array(); let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(st.into_array()).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - - let kept_indices = [0_usize, 3, 7]; - let kept_indices_u8 = kept_indices.iter().map(|&x| x as u8).collect::>(); - let array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_indices(ArrayData::from(kept_indices_u8)) - .build() + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), st.into_array().into_array_stream()) .await + .unwrap(); + + let file = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) + .await + .unwrap(); + + let kept_indices = [0_u64, 3, 7]; + let array = file + .scan(Scan::all().with_row_indices(Buffer::from_iter(kept_indices))) .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap() .into_struct() + .unwrap() + .into_struct() .unwrap(); let strings_actual = array @@ -728,7 +652,7 @@ async fn test_with_indices_on_two_columns() { strings_actual, kept_indices .iter() - .map(|&x| strings_expected[x]) + .map(|&x| strings_expected[x as usize]) .collect::>() ); @@ -742,7 +666,7 @@ async fn test_with_indices_on_two_columns() { numbers_actual, kept_indices .iter() - .map(|&x| numbers_expected[x]) + .map(|&x| numbers_expected[x as usize]) .collect::>() ); } @@ -759,56 +683,54 @@ async fn test_with_indices_and_with_row_filter_simple() { .unwrap(); let expected_numbers: Vec = expected_numbers_split.into_iter().flatten().collect(); - let writer = VortexFileWriter::new(Vec::new()) - .write_array_columns(expected_array.into_array()) + let buf = VortexWriteOptions::default() + .write( + ByteBufferMut::empty(), + expected_array.into_array().into_array_stream(), + ) .await .unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - // test no indices - let empty_indices = Buffer::::empty(); - let actual_kept_array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_indices(ArrayData::from(empty_indices)) - .with_row_filter(RowFilter::new(BinaryExpr::new_expr( - col("numbers"), - Operator::Gt, - lit(50_i16), - ))) - .build() + let file = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await + .unwrap(); + + let actual_kept_array = file + .scan( + Scan::all() + .with_filter(gt(get_item("numbers", ident()), lit(50_i16))) + .with_row_indices(Buffer::empty()), + ) .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap() .into_struct() + .unwrap() + .into_struct() .unwrap(); assert_eq!(actual_kept_array.len(), 0); // test a few indices - let kept_indices = [0_usize, 3, 99, 100, 101, 399, 400, 401, 499]; - let kept_indices_u16 = kept_indices - .iter() - .map(|&x| x as u16) - .collect::>(); - - let actual_kept_array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_indices(ArrayData::from(kept_indices_u16)) - .with_row_filter(RowFilter::new(BinaryExpr::new_expr( - col("numbers"), - Operator::Gt, - lit(50_i16), - ))) - .build() - .await + let kept_indices = [0u64, 3, 99, 100, 101, 399, 400, 401, 499]; + + let actual_kept_array = file + .scan( + Scan::all() + .with_filter(gt(get_item("numbers", ident()), lit(50_i16))) + .with_row_indices(Buffer::from_iter(kept_indices)), + ) .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap() .into_struct() + .unwrap() + .into_struct() .unwrap(); + let actual_kept_numbers_array = actual_kept_array .maybe_null_field_by_idx(0) .unwrap() @@ -817,7 +739,7 @@ async fn test_with_indices_and_with_row_filter_simple() { let expected_kept_numbers: Buffer = kept_indices .iter() - .map(|&x| expected_numbers[x]) + .map(|&x| expected_numbers[x as usize]) .filter(|&x| x > 50) .collect(); let actual_kept_numbers = actual_kept_numbers_array.as_slice::(); @@ -825,22 +747,21 @@ async fn test_with_indices_and_with_row_filter_simple() { assert_eq!(expected_kept_numbers.as_slice(), actual_kept_numbers); // test all indices - let actual_array = VortexReadBuilder::new(written.clone(), LayoutDeserializer::default()) - .with_indices(ArrayData::from((0..500).collect::>())) - .with_row_filter(RowFilter::new(BinaryExpr::new_expr( - col("numbers"), - Operator::Gt, - lit(50_i16), - ))) - .build() - .await + let actual_array = file + .scan( + Scan::all() + .with_filter(gt(get_item("numbers", ident()), lit(50_i16))) + .with_row_indices((0..500).collect::>()), + ) .unwrap() - .into_stream() - .read_all() + .into_array_data() .await .unwrap() .into_struct() + .unwrap() + .into_struct() .unwrap(); + let actual_numbers_array = actual_array .maybe_null_field_by_idx(0) .unwrap() @@ -887,35 +808,27 @@ async fn filter_string_chunked() { .unwrap() .into_array(); - let buffer = Vec::new(); - let written_bytes = VortexFileWriter::new(buffer) - .write_array_columns(array) + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), array.into_array_stream()) .await - .unwrap() - .finalize() + .unwrap(); + + let file = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await .unwrap(); - let actual_array = - VortexReadBuilder::new(Bytes::from(written_bytes), LayoutDeserializer::default()) - .with_row_filter(RowFilter::new(BinaryExpr::new_expr( - col("name"), - Operator::Eq, - lit("Joseph"), - ))) - .build() - .await - .unwrap() - .into_stream() - .read_all() - .await - .unwrap(); - assert_eq!(actual_array.len(), 1); - let names = actual_array - .as_struct_array() + let actual_array = file + .scan(Scan::all().with_filter(eq(get_item("name", ident()), lit("Joseph")))) .unwrap() - .maybe_null_field_by_idx(0) + .into_array_data() + .await + .unwrap() + .into_struct() .unwrap(); + + assert_eq!(actual_array.len(), 1); + let names = actual_array.maybe_null_field_by_idx(0).unwrap(); assert_eq!( names .into_varbinview() @@ -927,11 +840,7 @@ async fn filter_string_chunked() { .unwrap(), vec!["Joseph".to_string()] ); - let ages = actual_array - .as_struct_array() - .unwrap() - .maybe_null_field_by_idx(1) - .unwrap(); + let ages = actual_array.maybe_null_field_by_idx(1).unwrap(); assert_eq!(ages.into_primitive().unwrap().as_slice::(), vec![25]); } @@ -983,35 +892,30 @@ async fn test_pruning_with_or() { .unwrap() .into_array(); - let buffer = Vec::new(); - let written_bytes: Vec = VortexFileWriter::new(buffer) - .write_array_columns(array) + let buf = VortexWriteOptions::default() + .write(ByteBufferMut::empty(), array.into_array_stream()) .await - .unwrap() - .finalize() + .unwrap(); + + let file = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) .await .unwrap(); - let actual_array = - VortexReadBuilder::new(Bytes::from(written_bytes), LayoutDeserializer::default()) - .with_row_filter(RowFilter::new(BinaryExpr::new_expr( - BinaryExpr::new_expr(col("letter"), Operator::Lte, lit("J")), - Operator::Or, - BinaryExpr::new_expr(col("number"), Operator::Lt, lit(25)), - ))) - .build() - .await - .unwrap() - .into_stream() - .read_all() - .await - .unwrap(); - assert_eq!(actual_array.len(), 10); - let letters = actual_array - .as_struct_array() + let actual_array = file + .scan(Scan::all().with_filter(or( + lt_eq(get_item("letter", ident()), lit("J")), + lt(get_item("number", ident()), lit(25)), + ))) .unwrap() - .maybe_null_field_by_idx(0) + .into_array_data() + .await + .unwrap() + .into_struct() .unwrap(); + + assert_eq!(actual_array.len(), 10); + let letters = actual_array.maybe_null_field_by_idx(0).unwrap(); assert_eq!( letters .into_varbinview() @@ -1033,11 +937,7 @@ async fn test_pruning_with_or() { Some("P".to_string()) ] ); - let numbers = actual_array - .as_struct_array() - .unwrap() - .maybe_null_field_by_idx(1) - .unwrap(); + let numbers = actual_array.maybe_null_field_by_idx(1).unwrap(); assert_eq!( (0..numbers.len()) .map(|index| -> Option { @@ -1078,49 +978,30 @@ async fn test_repeated_projection() { .unwrap() .into_array(); - let written = VortexFileWriter::new(Vec::new()) - .write_array_columns(single_column_array) - .await - .unwrap() - .finalize() + let buf = VortexWriteOptions::default() + .write( + ByteBufferMut::empty(), + single_column_array.into_array_stream(), + ) .await .unwrap(); - async fn read_all( - w: W, - projection: Projection, - ) -> VortexResult { - VortexReadBuilder::new(w, LayoutDeserializer::default()) - .with_projection(projection) - .build() - .await? - .into_stream() - .read_all() - .await - } - - let actual = read_all( - Bytes::from(written.clone()), - Projection::new(["strings".into(), "strings".into()]), - ) - .await - .unwrap(); - - assert_eq!( - (0..actual.len()) - .map(|index| scalar_at(&actual, index).unwrap()) - .collect_vec(), - (0..expected.len()) - .map(|index| scalar_at(&expected, index).unwrap()) - .collect_vec() - ); + let file = VortexOpenOptions::new(Arc::new(Context::default())) + .open(buf.freeze()) + .await + .unwrap(); - let actual = read_all( - Bytes::from(written.clone()), - Projection::Flat(vec!["strings".into(), "strings".into()]), - ) - .await - .unwrap(); + let actual = file + .scan(Scan::new(select( + ["strings".into(), "strings".into()], + ident(), + ))) + .unwrap() + .into_array_data() + .await + .unwrap() + .into_struct() + .unwrap(); assert_eq!( (0..actual.len()) @@ -1131,124 +1012,3 @@ async fn test_repeated_projection() { .collect_vec() ); } - -#[tokio::test] -#[cfg_attr(miri, ignore)] -async fn test_simple_ranged_read() { - let strings = ChunkedArray::from_iter([ - VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), - VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), - ]) - .into_array(); - - let numbers = ChunkedArray::from_iter([ - buffer![1u32, 2, 3, 4].into_array(), - buffer![5u32, 6, 7, 8].into_array(), - ]) - .into_array(); - - let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(st.into_array()).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - - let handle = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .build() - .await - .unwrap(); - - dbg!(handle.splits()); - for v in [(0, 4), (4, 8)] { - assert!(handle.splits().contains(&v)); - } - - let mut stream = handle.stream_range(0, 4).unwrap(); - - let mut batch_count = 0; - let mut row_count = 0; - - while let Some(array) = stream.next().await { - let array = array.unwrap(); - batch_count += 1; - row_count += array.len(); - } - - assert_eq!(batch_count, 1); - assert_eq!(row_count, 4); -} - -#[tokio::test] -#[cfg_attr(miri, ignore)] -async fn test_simple_range_twice() { - let strings = ChunkedArray::from_iter([ - VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), - VarBinArray::from(vec!["ab", "foo", "bar", "baz"]).into_array(), - ]) - .into_array(); - - let numbers = ChunkedArray::from_iter([ - buffer![1u32, 2, 3, 4].into_array(), - buffer![5u32, 6, 7, 8].into_array(), - ]) - .into_array(); - - let st = StructArray::from_fields(&[("strings", strings), ("numbers", numbers)]).unwrap(); - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer.write_array_columns(st.into_array()).await.unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - - let handle = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .build() - .await - .unwrap(); - - for v in [(0, 4), (4, 8)] { - assert!(handle.splits().contains(&v)); - } - - for _ in 0..2 { - let mut stream = handle.clone().stream_range(0, 7).unwrap(); - - let mut batch_count = 0; - let mut row_count = 0; - - while let Some(array) = stream.next().await { - let array = array.unwrap(); - batch_count += 1; - row_count += array.len(); - } - - assert_eq!(batch_count, 2); - assert_eq!(row_count, 7); - } -} - -#[tokio::test] -async fn roundtrip_row_count() { - let st = StructArray::try_new([].into(), [].into(), 2, Validity::AllValid).unwrap(); - - let buf = Vec::new(); - let mut writer = VortexFileWriter::new(buf); - writer = writer - .write_array_columns(st.clone().into_array()) - .await - .unwrap(); - let written = Bytes::from(writer.finalize().await.unwrap()); - - let handle = VortexReadBuilder::new(written, LayoutDeserializer::default()) - .build() - .await - .unwrap(); - - let read = handle - .into_stream() - .map(|a| a.unwrap()) - .collect::>() - .await; - - let read_rows = read.iter().map(|a| a.len()).sum::(); - - assert_eq!(st.len(), read_rows); -} diff --git a/vortex-file/src/v2/file.rs b/vortex-file/src/v2/file.rs index 8ebe4e0f92..157b22ff5e 100644 --- a/vortex-file/src/v2/file.rs +++ b/vortex-file/src/v2/file.rs @@ -114,7 +114,9 @@ impl VortexFile { }; // Otherwise, find the indices that are within the row range. - if row_range.end <= row_indices[0] + if row_indices + .first() + .is_some_and(|&first| first >= row_range.end) || row_indices .last() .is_some_and(|&last| row_range.start >= last) diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index 3b5cfdeb2a..23a6b17452 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -1,8 +1,9 @@ -use std::future::{self, Future}; +use std::future::{ready, Future}; use std::io; use std::sync::Arc; use bytes::Bytes; +use vortex_buffer::ByteBuffer; use vortex_error::{vortex_err, VortexUnwrap}; /// A trait for types that support asynchronous reads. @@ -67,15 +68,37 @@ impl VortexReadAt for Bytes { let read_start: usize = pos.try_into().vortex_unwrap(); let read_end: usize = (len + pos).try_into().vortex_unwrap(); if read_end > self.len() { - return future::ready(Err(io::Error::new( + return ready(Err(io::Error::new( io::ErrorKind::UnexpectedEof, vortex_err!("unexpected eof"), ))); } - future::ready(Ok(self.slice(read_start..read_end))) + ready(Ok(self.slice(read_start..read_end))) } fn size(&self) -> impl Future> + 'static { - future::ready(Ok(self.len() as u64)) + ready(Ok(self.len() as u64)) + } +} + +impl VortexReadAt for ByteBuffer { + fn read_byte_range( + &self, + pos: u64, + len: u64, + ) -> impl Future> + 'static { + let read_start: usize = pos.try_into().vortex_unwrap(); + let read_end: usize = (len + pos).try_into().vortex_unwrap(); + if read_end > self.len() { + return ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + vortex_err!("unexpected eof"), + ))); + } + ready(Ok(self.slice(read_start..read_end).into_inner())) + } + + fn size(&self) -> impl Future> + 'static { + ready(Ok(self.len() as u64)) } }