Skip to content

Commit

Permalink
Async Layouts (#1866)
Browse files Browse the repository at this point in the history
Our layouts are implemented with a `poll(&mut segments) ->
Poll::NeedMore(segment_ids)` style API, these are abstracted behind an
`Operation` trait. In practice, this is almost identical to Rust's
`Future` trait. Why not benefit from the ecosystem of async utilities?

Figured it was worth seeing what the code looks like
gatesn authored Jan 9, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent b89b82e commit 3fc82f3
Showing 36 changed files with 655 additions and 851 deletions.
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -200,7 +200,7 @@ mem_forget = "deny"
multiple_crate_versions = "allow"
or_fun_call = "deny"
panic = "deny"
panic_in_result_fn = "deny"
# panic_in_result_fn = "deny" -- we cannot disable this for tests to use assertions
redundant_clone = "deny"
same_name_method = "deny"
tests_outside_test_module = "deny"
3 changes: 2 additions & 1 deletion vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ readme = "README.md"
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["std"] }
@@ -33,7 +34,7 @@ vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["file"] }
vortex-io = { workspace = true }
vortex-ipc = { workspace = true }
vortex-layout = { workspace = true, features = ["vortex-scan"] }
vortex-layout = { workspace = true }
vortex-scalar = { workspace = true, features = ["flatbuffers"] }
vortex-scan = { workspace = true }

61 changes: 32 additions & 29 deletions vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
use std::io::Read;
use std::ops::Range;
use std::sync::Arc;
use std::task::Poll;

use futures_util::stream;
use futures::pin_mut;
use futures_util::future::poll_fn;
use futures_util::{stream, TryFutureExt};
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
use vortex_array::ContextRef;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_io::VortexReadAt;
use vortex_layout::operations::{Operation, Poll};
use vortex_layout::{LayoutData, LayoutReader};
use vortex_scan::Scan;

use crate::v2::footer::Segment;
use crate::v2::segments::SegmentCache;
use crate::v2::segments::cache::SegmentCache;

pub struct VortexFile<R> {
pub(crate) read: R,
pub(crate) ctx: ContextRef,
pub(crate) layout: LayoutData,
pub(crate) segments: Vec<Segment>,
pub(crate) segment_cache: SegmentCache,
pub(crate) segments: Arc<SegmentCache<R>>,
// TODO(ngates): not yet used by the file reader
#[allow(dead_code)]
pub(crate) splits: Vec<Range<u64>>,
pub(crate) splits: Arc<[Range<u64>]>,
}

impl<R> VortexFile<R> {}

/// Async implementation of Vortex File.
impl<R: VortexReadAt> VortexFile<R> {
impl<R: VortexReadAt + Unpin> VortexFile<R> {
/// Returns the number of rows in the file.
pub fn row_count(&self) -> u64 {
self.layout.row_count()
@@ -43,36 +41,41 @@ impl<R: VortexReadAt> VortexFile<R> {
/// Performs a scan operation over the file.
pub fn scan(&self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + '_> {
// Create a shared reader for the scan.
let reader: Arc<dyn LayoutReader> = self.layout.reader(self.ctx.clone())?;
let reader: Arc<dyn LayoutReader> = self
.layout
.reader(self.segments.clone(), self.ctx.clone())?;
let result_dtype = scan.result_dtype(self.dtype())?;
// For each row-group, we set up a future that will evaluate the scan and post its.

// TODO(ngates): we could query the layout for splits and then process them in parallel.
// For now, we just scan the entire layout with one mask.
// Note that to implement this we would use stream::try_unfold
let stream = stream::once(async move {
let row_range = 0..reader.layout().row_count();
let mut range_scan = reader.range_scan(scan.range_scan(row_range)?);
// TODO(ngates): we should launch the evaluate_async onto a worker thread pool.
let row_range = 0..self.layout.row_count();

loop {
match range_scan.poll(&self.segment_cache)? {
Poll::Some(array) => return Ok(array),
Poll::NeedMore(segment_ids) => {
for segment_id in segment_ids {
let segment = &self.segments[*segment_id as usize];
let bytes = self
.read
.read_byte_range(segment.offset, segment.length as u64)
.await?;
self.segment_cache.set(segment_id, bytes);
}
let eval = scan
.range_scan(row_range)?
.evaluate_async(reader.evaluator());
pin_mut!(eval);

poll_fn(|cx| {
// Now we alternate between polling the eval task and driving the I/O.
loop {
if let Poll::Ready(array) = eval.try_poll_unpin(cx) {
return Poll::Ready(array);
}
let drive = self.segments.drive();
pin_mut!(drive);
match drive.try_poll_unpin(cx) {
Poll::Ready(_) => {}
Poll::Pending => return Poll::Pending,
}
}
}
})
.await
});

Ok(ArrayStreamAdapter::new(result_dtype, stream))
}
}

/// Sync implementation of Vortex File.
impl<R: Read> VortexFile<R> {}
4 changes: 3 additions & 1 deletion vortex-file/src/v2/footer/file_layout.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use vortex_flatbuffers::{footer2 as fb, FlatBufferRoot, WriteFlatBuffer};
use vortex_layout::LayoutData;

@@ -7,7 +9,7 @@ use crate::v2::footer::segment::Segment;
#[derive(Clone)]
pub(crate) struct FileLayout {
pub(crate) root_layout: LayoutData,
pub(crate) segments: Vec<Segment>,
pub(crate) segments: Arc<[Segment]>,
}

impl FlatBufferRoot for FileLayout {}
18 changes: 8 additions & 10 deletions vortex-file/src/v2/open/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod split_by;

use std::io::Read;
use std::ops::Range;
use std::sync::Arc;

use flatbuffers::root;
use itertools::Itertools;
@@ -16,7 +16,7 @@ use vortex_layout::segments::SegmentId;
use vortex_layout::{LayoutContextRef, LayoutData, LayoutId};

use crate::v2::footer::{FileLayout, Postscript, Segment};
use crate::v2::segments::SegmentCache;
use crate::v2::segments::cache::SegmentCache;
use crate::v2::VortexFile;
use crate::{EOF_SIZE, MAGIC_BYTES, VERSION};

@@ -126,7 +126,7 @@ impl OpenOptions {

// Set up our segment cache and for good measure, we populate any segments that were
// covered by the initial read.
let mut segment_cache = SegmentCache::default();
let mut segment_cache = SegmentCache::<R>::new(read, file_layout.segments.clone());
self.populate_segments(
initial_offset,
&initial_read,
@@ -135,15 +135,13 @@ impl OpenOptions {
)?;

// Compute the splits of the file.
let splits: Vec<Range<u64>> = self.split_by.splits(&file_layout.root_layout)?;
let splits = self.split_by.splits(&file_layout.root_layout)?.into();

// Finally, create the VortexFile.
Ok(VortexFile {
read,
ctx: self.ctx.clone(),
layout: file_layout.root_layout,
segments: file_layout.segments,
segment_cache,
segments: Arc::new(segment_cache),
splits,
})
}
@@ -239,12 +237,12 @@ impl OpenOptions {
})
}

fn populate_segments(
fn populate_segments<R>(
&self,
initial_offset: u64,
initial_read: &ByteBuffer,
file_layout: &FileLayout,
segments: &mut SegmentCache,
segments: &mut SegmentCache<R>,
) -> VortexResult<()> {
for (idx, segment) in file_layout.segments.iter().enumerate() {
if segment.offset < initial_offset {
@@ -257,7 +255,7 @@ impl OpenOptions {
let offset = usize::try_from(segment.offset - initial_offset)?;
let bytes = initial_read.slice(offset..offset + segment.length);

segments.set(segment_id, bytes.into_inner());
segments.set(segment_id, bytes.into_inner())?;
}
Ok(())
}
2 changes: 1 addition & 1 deletion vortex-file/src/v2/open/split_by.rs
Original file line number Diff line number Diff line change
@@ -58,7 +58,7 @@ mod test {
use vortex_layout::strategies::LayoutWriterExt;

use super::*;
use crate::v2::segments::BufferedSegmentWriter;
use crate::v2::segments::writer::BufferedSegmentWriter;

#[test]
fn test_layout_splits_flat() {
91 changes: 91 additions & 0 deletions vortex-file/src/v2/segments/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
//! The segment reader provides an async interface to layouts for resolving individual segments.
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use bytes::Bytes;
use futures::channel::oneshot;
use futures_util::future::try_join_all;
use itertools::Itertools;
use vortex_array::aliases::hash_map::HashMap;
use vortex_error::{vortex_err, VortexResult};
use vortex_io::VortexReadAt;
use vortex_layout::segments::{AsyncSegmentReader, SegmentId};

use crate::v2::footer::Segment;

pub(crate) struct SegmentCache<R> {
read: R,
segments: Arc<[Segment]>,
inflight: RwLock<HashMap<SegmentId, Vec<oneshot::Sender<Bytes>>>>,
}

impl<R> SegmentCache<R> {
pub fn new(read: R, segments: Arc<[Segment]>) -> Self {
Self {
read,
segments,
inflight: RwLock::new(HashMap::new()),
}
}

pub fn set(&mut self, _segment_id: SegmentId, _bytes: Bytes) -> VortexResult<()> {
// Do nothing for now
Ok(())
}
}

impl<R: VortexReadAt> SegmentCache<R> {
/// Drives the segment cache.
pub(crate) async fn drive(&self) -> VortexResult<()>
where
Self: Unpin,
{
// Grab a read lock and collect a set of segments to read.
let segment_ids = self
.inflight
.read()
.map_err(|_| vortex_err!("poisoned"))?
.iter()
.filter_map(|(id, channels)| (!channels.is_empty()).then_some(*id))
.collect::<Vec<_>>();

// Read all the segments.
let buffers = try_join_all(segment_ids.iter().map(|id| {
let segment = &self.segments[**id as usize];
self.read
.read_byte_range(segment.offset, segment.length as u64)
}))
.await?;

// Send the buffers to the waiting channels.
let mut inflight = self.inflight.write().map_err(|_| vortex_err!("poisoned"))?;
for (id, buffer) in segment_ids.into_iter().zip_eq(buffers.into_iter()) {
let channels = inflight
.remove(&id)
.ok_or_else(|| vortex_err!("missing inflight segment"))?;
for sender in channels {
sender
.send(buffer.clone())
.map_err(|_| vortex_err!("receiver dropped"))?;
}
}

Ok(())
}
}

#[async_trait]
impl<R: VortexReadAt> AsyncSegmentReader for SegmentCache<R> {
async fn get(&self, id: SegmentId) -> VortexResult<Bytes> {
let (send, recv) = oneshot::channel();
self.inflight
.write()
.map_err(|_| vortex_err!("poisoned"))?
.entry(id)
.or_default()
.push(send);
recv.await
.map_err(|cancelled| vortex_err!("segment read cancelled: {:?}", cancelled))
}
}
2 changes: 2 additions & 0 deletions vortex-file/src/v2/segments/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub(crate) mod cache;
pub(crate) mod writer;
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::sync::{Arc, RwLock};

use bytes::Bytes;
use vortex_array::aliases::hash_map::HashMap;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_error::{vortex_err, VortexResult};
use vortex_io::VortexWrite;
use vortex_layout::segments::{SegmentId, SegmentReader, SegmentWriter};
use vortex_layout::segments::{SegmentId, SegmentWriter};

use crate::v2::footer::Segment;

@@ -43,32 +40,3 @@ impl BufferedSegmentWriter {
Ok(())
}
}

/// A segment cache that holds segments in memory.
///
/// TODO(ngates): switch to a Moka LRU cache.
#[derive(Default, Clone)]
pub(crate) struct SegmentCache {
segments: Arc<RwLock<HashMap<SegmentId, Bytes>>>,
}

impl SegmentCache {
pub(crate) fn set(&self, id: SegmentId, data: Bytes) {
self.segments
.write()
.map_err(|_| vortex_err!("Poisoned cache"))
.vortex_expect("poisoned")
.insert(id, data);
}
}

impl SegmentReader for SegmentCache {
fn get(&self, id: SegmentId) -> Option<Bytes> {
self.segments
.read()
.map_err(|_| vortex_err!("Poisoned cache"))
.vortex_expect("poisoned")
.get(&id)
.cloned()
}
}
51 changes: 25 additions & 26 deletions vortex-file/src/v2/tests.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,38 @@
use bytes::Bytes;
use futures_executor::block_on;
use vortex_array::array::ChunkedArray;
use vortex_array::stream::ArrayStreamExt;
use vortex_array::{ContextRef, IntoArrayData, IntoArrayVariant};
use vortex_buffer::buffer;
use vortex_error::VortexResult;
use vortex_scan::Scan;

use crate::v2::{OpenOptions, WriteOptions};
use crate::v2::*;

#[tokio::test]
async fn write_read() {
let arr = ChunkedArray::from_iter(vec![
buffer![0, 1, 2].into_array(),
buffer![3, 4, 5].into_array(),
])
.into_array();
#[test]
fn basic_file_roundtrip() -> VortexResult<()> {
block_on(async {
let array = ChunkedArray::from_iter([
buffer![0, 1, 2].into_array(),
buffer![3, 4, 5].into_array(),
buffer![6, 7, 8].into_array(),
])
.into_array();

let written = WriteOptions::default()
.write_async(vec![], arr.into_array_stream())
.await
.unwrap();
let buffer: Bytes = WriteOptions::default()
.write_async(vec![], array.into_array_stream())
.await?
.into();

// TODO(ngates): no need to wrap Vec<u8> in Bytes if VortexReadAt doesn't require clone.
let vxf = OpenOptions::new(ContextRef::default())
.open(Bytes::from(written))
.await
.unwrap();
let vxf = OpenOptions::new(ContextRef::default()).open(buffer).await?;
let result = vxf
.scan(Scan::all())?
.into_array_data()
.await?
.into_primitive()?;

let result = vxf
.scan(Scan::all())
.unwrap()
.into_array_data()
.await
.unwrap()
.into_primitive()
.unwrap();
assert_eq!(result.as_slice::<i32>(), &[0, 1, 2, 3, 4, 5, 6, 7, 8]);

assert_eq!(result.as_slice::<i32>(), &[0, 1, 2, 3, 4, 5]);
Ok(())
})
}
4 changes: 2 additions & 2 deletions vortex-file/src/v2/writer.rs
Original file line number Diff line number Diff line change
@@ -9,7 +9,7 @@ use vortex_io::VortexWrite;
use vortex_layout::strategies::LayoutStrategy;

use crate::v2::footer::{FileLayout, Postscript, Segment};
use crate::v2::segments::BufferedSegmentWriter;
use crate::v2::segments::writer::BufferedSegmentWriter;
use crate::v2::strategy::VortexLayoutStrategy;
use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};

@@ -79,7 +79,7 @@ impl WriteOptions {
&mut write,
&FileLayout {
root_layout,
segments,
segments: segments.into(),
},
)
.await?;
8 changes: 6 additions & 2 deletions vortex-layout/Cargo.toml
Original file line number Diff line number Diff line change
@@ -15,8 +15,11 @@ categories.workspace = true

[dependencies]
arrow-buffer = { workspace = true }
async-once-cell = "0.5.4"
async-trait = { workspace = true }
bytes = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
@@ -25,10 +28,11 @@ vortex-error = { workspace = true }
vortex-expr = { workspace = true }
vortex-flatbuffers = { workspace = true, features = ["layout"] }
vortex-ipc = { workspace = true }
vortex-scan = { workspace = true, optional = true }
vortex-scalar = { workspace = true }
vortex-scan = { workspace = true }

[dev-dependencies]
rstest = { workspace = true }
futures = { workspace = true, features = ["executor"] }

[lints]
workspace = true
10 changes: 7 additions & 3 deletions vortex-layout/src/data.rs
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ use vortex_flatbuffers::{layout as fb, layout, FlatBufferRoot, WriteFlatBuffer};
use crate::context::LayoutContextRef;
use crate::encoding::{LayoutEncodingRef, LayoutId};
use crate::reader::LayoutReader;
use crate::segments::SegmentId;
use crate::segments::{AsyncSegmentReader, SegmentId};

/// [`LayoutData`] is the lazy equivalent to [`vortex_array::ArrayData`], providing a hierarchical
/// structure.
@@ -249,8 +249,12 @@ impl LayoutData {
}

/// Create a reader for this layout.
pub fn reader(&self, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader + 'static>> {
self.encoding().reader(self.clone(), ctx)
pub fn reader(
&self,
segments: Arc<dyn AsyncSegmentReader>,
ctx: ContextRef,
) -> VortexResult<Arc<dyn LayoutReader + 'static>> {
self.encoding().reader(self.clone(), ctx, segments)
}

/// Register splits for this layout.
8 changes: 7 additions & 1 deletion vortex-layout/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ use std::sync::Arc;
use vortex_array::ContextRef;
use vortex_error::VortexResult;

use crate::segments::AsyncSegmentReader;
use crate::{LayoutData, LayoutReader};

#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
@@ -23,7 +24,12 @@ pub trait LayoutEncoding: Debug + Send + Sync {
/// Construct a [`LayoutReader`] for the provided [`LayoutData`].
///
/// May panic if the provided `LayoutData` is not the same encoding as this `LayoutEncoding`.
fn reader(&self, layout: LayoutData, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader>>;
fn reader(
&self,
layout: LayoutData,
ctx: ContextRef,
segments: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Arc<dyn LayoutReader>>;

/// Register the row splits for this layout, these represent natural boundaries at which
/// a reader can split the layout for independent processing.
301 changes: 114 additions & 187 deletions vortex-layout/src/layouts/chunked/evaluator.rs
Original file line number Diff line number Diff line change
@@ -1,182 +1,110 @@
use std::sync::Arc;

use vortex_array::array::ChunkedArray;
use async_trait::async_trait;
use futures::future::{ready, try_join_all};
use futures::FutureExt;
use vortex_array::array::{ChunkedArray, ConstantArray};
use vortex_array::{ArrayDType, ArrayData, Canonical, IntoArrayData, IntoArrayVariant};
use vortex_error::{vortex_panic, VortexExpect, VortexResult};
use vortex_error::VortexResult;
use vortex_expr::pruning::PruningPredicate;
use vortex_expr::ExprRef;
use vortex_scalar::Scalar;
use vortex_scan::{AsyncEvaluator, RowMask};

use crate::layouts::chunked::reader::ChunkedReader;
use crate::operations::{Operation, Poll};
use crate::reader::{EvalOp, LayoutScanExt};
use crate::segments::SegmentReader;
use crate::{ready, RowMask};

/// Evaluation operation for an expression over a chunked layout.
pub(crate) struct ChunkedEvaluator {
reader: Arc<ChunkedReader>,
row_mask: RowMask,
expr: ExprRef,
pruning_predicate: Option<PruningPredicate>,
// State for each chunk in the layout
chunk_states: Option<Vec<ChunkState>>,
}

impl ChunkedEvaluator {
pub fn new(chunked_scan: Arc<ChunkedReader>, row_mask: RowMask, expr: ExprRef) -> Self {
use crate::reader::LayoutScanExt;

#[async_trait(?Send)]
impl AsyncEvaluator for ChunkedReader {
async fn evaluate(self: &Self, row_mask: RowMask, expr: ExprRef) -> VortexResult<ArrayData> {
// Compute the result dtype of the expression.
let dtype = expr
.evaluate(&Canonical::empty(self.dtype())?.into_array())?
.dtype()
.clone();

// First we need to compute the pruning mask
let pruning_predicate = PruningPredicate::try_new(&expr);
Self {
reader: chunked_scan,
row_mask,
expr,
pruning_predicate,
chunk_states: None,
}
}
}

enum ChunkState {
Pending(EvalOp),
Resolved(Option<ArrayData>),
}

impl Operation for ChunkedEvaluator {
type Output = ArrayData;

fn poll(&mut self, segments: &dyn SegmentReader) -> VortexResult<Poll<Self::Output>> {
// If we haven't set up our chunk state yet, then we need to do that first.
if self.chunk_states.is_none() {
// First we need to compute the pruning mask
let pruning_mask = if let Some(predicate) = &self.pruning_predicate {
// If the expression is prune-able, then fetch the stats table
if let Some(stats_table) = ready!(self.reader.stats_table_op()?.poll(segments)) {
predicate
.evaluate(stats_table.array())?
.map(|mask| mask.into_bool())
.transpose()?
.map(|mask| mask.boolean_buffer())
} else {
None
}
let pruning_mask = if let Some(predicate) = pruning_predicate {
// If the expression is prune-able, then fetch the stats table
if let Some(stats_table) = self.stats_table().await? {
predicate
.evaluate(stats_table.array())?
.map(|mask| mask.into_bool())
.transpose()?
.map(|mask| mask.boolean_buffer())
} else {
None
};

// Now we can set up the chunk state.
let mut chunks = Vec::with_capacity(self.reader.nchunks());
let mut row_offset = 0;
for chunk_idx in 0..self.reader.nchunks() {
let chunk_reader = self.reader.child(chunk_idx)?;

// Figure out the row range of the chunk
let chunk_len = chunk_reader.layout().row_count();
let chunk_range = row_offset..row_offset + chunk_len;
row_offset += chunk_len;

// Try to skip the chunk based on the row-mask
if self.row_mask.is_disjoint(chunk_range.clone()) {
chunks.push(ChunkState::Resolved(None));
continue;
}
}
} else {
None
};

// Try to skip the chunk based on the pruning predicate
if let Some(pruning_mask) = &pruning_mask {
if pruning_mask.value(chunk_idx) {
chunks.push(ChunkState::Resolved(None));
continue;
}
}
// Now we set up futures to evaluate each chunk at the same time
let mut chunks = Vec::with_capacity(self.nchunks());

// Otherwise, we need to read it. So we set up a mask for the chunk range.
let chunk_mask = self
.row_mask
.slice(chunk_range.start, chunk_range.end)?
.shift(chunk_range.start)?;
let chunk_evaluator =
chunk_reader.create_evaluator(chunk_mask, self.expr.clone())?;
chunks.push(ChunkState::Pending(chunk_evaluator));
}
let mut row_offset = 0;
for chunk_idx in 0..self.nchunks() {
let chunk_reader = self.child(chunk_idx)?;

self.chunk_states = Some(chunks);
}
// Figure out the row range of the chunk
let chunk_len = chunk_reader.layout().row_count();
let chunk_range = row_offset..row_offset + chunk_len;
row_offset += chunk_len;

// Try to skip the chunk based on the row-mask
if row_mask.is_disjoint(chunk_range.clone()) {
continue;
}

let chunk_states = self
.chunk_states
.as_mut()
.vortex_expect("chunk state not set");

// Now we try to read the chunks.
let mut needed = vec![];
for chunk_state in chunk_states.iter_mut() {
match chunk_state {
ChunkState::Pending(scanner) => match scanner.poll(segments)? {
Poll::Some(array) => {
// Resolve the chunk
*chunk_state = ChunkState::Resolved(Some(array));
}
Poll::NeedMore(segment_ids) => {
// Request more segments
needed.extend(segment_ids);
}
},
ChunkState::Resolved(_) => {
// Already resolved
// If the pruning mask tells us the chunk is pruned (i.e. the expr is ALL false),
// then we can just return a constant array.
if let Some(pruning_mask) = &pruning_mask {
if pruning_mask.value(chunk_idx) {
let false_array = ConstantArray::new(
Scalar::bool(false, dtype.nullability()),
row_mask.true_count(),
);
chunks.push(ready(Ok(false_array.into_array())).boxed_local());
continue;
}
}
}

// If we need more segments, then request them.
if !needed.is_empty() {
return Ok(Poll::NeedMore(needed));
// Otherwise, we need to read it. So we set up a mask for the chunk range.
let chunk_mask = row_mask
.slice(chunk_range.start, chunk_range.end)?
.shift(chunk_range.start)?;

let expr = expr.clone();
chunks.push(chunk_reader.evaluate(chunk_mask, expr).boxed_local());
}

// Otherwise, we've read all the chunks, so we're done.
let chunks = chunk_states
.iter_mut()
.filter_map(|state| match state {
ChunkState::Resolved(array) => array.take(),
_ => vortex_panic!(
"This is a bug. Missing a chunk array with no more segments to read"
),
})
.collect::<Vec<_>>();

let dtype = if let Some(chunk) = chunks.first() {
chunk.dtype().clone()
} else {
self.expr
.evaluate(&Canonical::empty(self.reader.dtype())?.into_array())?
.dtype()
.clone()
};
// Wait for all chunks to be evaluated
let chunks = try_join_all(chunks).await?;

Ok(Poll::Some(
ChunkedArray::try_new(chunks, dtype)?.into_array(),
))
Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use futures::executor::block_on;
use vortex_array::array::{BoolArray, ChunkedArray, ConstantArray};
use vortex_array::{ArrayLen, IntoArrayData, IntoArrayVariant};
use vortex_buffer::buffer;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, PType};
use vortex_error::vortex_panic;
use vortex_error::VortexExpect;
use vortex_expr::{gt, lit, Identity};
use vortex_scan::RowMask;

use crate::layouts::chunked::evaluator::{ChunkState, ChunkedEvaluator};
use crate::layouts::chunked::reader::ChunkedReader;
use crate::layouts::chunked::writer::ChunkedLayoutWriter;
use crate::operations::{Operation, Poll};
use crate::segments::test::TestSegments;
use crate::strategies::LayoutWriterExt;
use crate::{LayoutData, RowMask};
use crate::LayoutData;

/// Create a chunked layout with three chunks of primitive arrays.
fn chunked_layout() -> (TestSegments, LayoutData) {
fn chunked_layout() -> (Arc<TestSegments>, LayoutData) {
let mut segments = TestSegments::default();
let layout = ChunkedLayoutWriter::new(
&DType::Primitive(PType::I32, NonNullable),
@@ -191,57 +119,56 @@ mod test {
],
)
.unwrap();
(segments, layout)
(Arc::new(segments), layout)
}

#[test]
fn test_chunked_scan() {
let (segments, layout) = chunked_layout();

let scan = layout.reader(Default::default()).unwrap();
let result = segments
.evaluate(scan, Identity::new_expr())
.into_primitive()
.unwrap();

assert_eq!(result.len(), 9);
assert_eq!(result.as_slice::<i32>(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
block_on(async {
let (segments, layout) = chunked_layout();

let result = layout
.reader(segments, Default::default())
.unwrap()
.evaluate(
RowMask::new_valid_between(0, layout.row_count()),
Identity::new_expr(),
)
.await
.unwrap()
.into_primitive()
.unwrap();

assert_eq!(result.len(), 9);
assert_eq!(result.as_slice::<i32>(), &[1, 2, 3, 4, 5, 6, 7, 8, 9]);
})
}

#[test]
// FIXME(ngates): when we make LayoutReader Send we will fix this
#[allow(clippy::arc_with_non_send_sync)]
fn test_chunked_pruning_mask() {
let (segments, layout) = chunked_layout();
let row_count = layout.row_count();
let reader = ChunkedReader::try_new(layout, Default::default()).unwrap();

// Populate the stats table so that we can verify the pruning mask works.
_ = reader.stats_table_op().unwrap().poll(&segments).unwrap();

let expr = gt(Identity::new_expr(), lit(6));
let mut evaluator = ChunkedEvaluator::new(
Arc::new(reader),
RowMask::new_valid_between(0, row_count),
expr,
);

// Then we poll the chunked scanner without any segments so _only_ the stats were
// available.
let Poll::NeedMore(_segments) = evaluator.poll(&TestSegments::default()).unwrap() else {
unreachable!()
};

// Now we validate that based on the pruning mask, we have excluded the first two chunks
let chunk_states = evaluator.chunk_states.as_ref().unwrap().as_slice();
if !matches!(chunk_states[0], ChunkState::Resolved(None)) {
vortex_panic!("Expected first chunk to be pruned");
}
if !matches!(chunk_states[1], ChunkState::Resolved(None)) {
vortex_panic!("Expected second chunk to be pruned");
}
if !matches!(chunk_states[2], ChunkState::Pending(_)) {
vortex_panic!("Expected third chunk to be read");
}
block_on(async {
let (segments, layout) = chunked_layout();
let row_count = layout.row_count();
let reader = layout.reader(segments, Default::default()).unwrap();

// Choose a prune-able expression
let expr = gt(Identity::new_expr(), lit(7));

let result = reader
.evaluate(RowMask::new_valid_between(0, row_count), expr.clone())
.await
.unwrap();
let result = ChunkedArray::try_from(result).unwrap();

// Now we ensure that the pruned chunks are ConstantArrays, instead of having been
// evaluated.
assert_eq!(result.nchunks(), 3);
ConstantArray::try_from(result.chunk(0).unwrap())
.vortex_expect("Expected first chunk to be pruned");
ConstantArray::try_from(result.chunk(1).unwrap())
.vortex_expect("Expected second chunk to be pruned");
BoolArray::try_from(result.chunk(2).unwrap())
.vortex_expect("Expected third chunk to be evaluated");
})
}
}
10 changes: 8 additions & 2 deletions vortex-layout/src/layouts/chunked/mod.rs
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ use crate::data::LayoutData;
use crate::encoding::{LayoutEncoding, LayoutId};
use crate::layouts::chunked::reader::ChunkedReader;
use crate::reader::{LayoutReader, LayoutScanExt};
use crate::segments::AsyncSegmentReader;
use crate::CHUNKED_LAYOUT_ID;

#[derive(Default, Debug)]
@@ -28,8 +29,13 @@ impl LayoutEncoding for ChunkedLayout {
CHUNKED_LAYOUT_ID
}

fn reader(&self, layout: LayoutData, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader>> {
Ok(ChunkedReader::try_new(layout, ctx)?.into_arc())
fn reader(
&self,
layout: LayoutData,
ctx: ContextRef,
segments: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Arc<dyn LayoutReader>> {
Ok(ChunkedReader::try_new(layout, ctx, segments)?.into_arc())
}

fn register_splits(
140 changes: 73 additions & 67 deletions vortex-layout/src/layouts/chunked/reader.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
use std::sync::{Arc, OnceLock, RwLock, RwLockWriteGuard};
use std::sync::{Arc, OnceLock};

use async_once_cell::OnceCell;
use vortex_array::stats::{stats_from_bitset_bytes, Stat};
use vortex_array::ContextRef;
use vortex_error::{vortex_err, vortex_panic, VortexError, VortexResult};
use vortex_expr::{ExprRef, Identity};
use vortex_error::{vortex_panic, VortexResult};
use vortex_expr::Identity;
use vortex_scan::{AsyncEvaluator, RowMask};

use crate::layouts::chunked::evaluator::ChunkedEvaluator;
use crate::layouts::chunked::stats_table::StatsTable;
use crate::layouts::chunked::ChunkedLayout;
use crate::operations::cached::CachedOperation;
use crate::operations::{resolved, Operation, OperationExt};
use crate::reader::{EvalOp, LayoutReader};
use crate::{LayoutData, LayoutEncoding, RowMask};
use crate::reader::LayoutReader;
use crate::segments::AsyncSegmentReader;
use crate::{LayoutData, LayoutEncoding};

#[derive(Clone)]
pub struct ChunkedReader {
layout: LayoutData,
ctx: ContextRef,
/// Shared stats table operation and cache of the result
stats_table_op: RwLock<StatsTableOp>,
segments: Arc<dyn AsyncSegmentReader>,
/// Shared stats table
stats_table: Arc<OnceCell<Option<StatsTable>>>,
/// Shared lazy chunk scanners
// TODO(ngates): consider an LRU cache here so we don't indefinitely hold onto chunk readers.
// If we do this, then we could also cache ArrayData in a FlatLayout since we know that this
// cache will eventually be evicted.
chunk_readers: Vec<OnceLock<Arc<dyn LayoutReader>>>,
chunk_readers: Arc<[OnceLock<Arc<dyn LayoutReader>>]>,
}

type StatsTableOp = CachedOperation<Box<dyn Operation<Output = Option<StatsTable>>>>;

impl ChunkedReader {
pub(super) fn try_new(layout: LayoutData, ctx: ContextRef) -> VortexResult<Self> {
pub(super) fn try_new(
layout: LayoutData,
ctx: ContextRef,
segments: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Self> {
if layout.encoding().id() != ChunkedLayout.id() {
vortex_panic!("Mismatched layout ID")
}
@@ -40,57 +41,64 @@ impl ChunkedReader {
nchunks -= 1;
}

// Figure out which stats are present
let present_stats: Arc<[Stat]> = layout
.metadata()
.map(|m| stats_from_bitset_bytes(m.as_ref()))
.unwrap_or_default()
.into();

let stats_table_op = layout
.metadata()
.is_some()
.then(|| {
let column_dtype = layout.dtype().clone();
let stats_dtype = StatsTable::dtype_for_stats_table(&column_dtype, &present_stats);
let stats_layout = layout.child(layout.nchildren() - 1, stats_dtype)?;
let op = stats_layout
.reader(ctx.clone())?
.create_evaluator(
RowMask::new_valid_between(0, nchunks as u64),
Identity::new_expr(),
)?
.map(move |stats_array| {
StatsTable::try_new(
column_dtype.clone(),
stats_array,
present_stats.clone(),
)
.map(Some)
})
.boxed();
Ok::<_, VortexError>(op)
})
.transpose()?
.unwrap_or_else(|| resolved(None).boxed())
.cached();

// Construct a lazy scan for each chunk of the layout.
let chunk_scans = (0..nchunks).map(|_| OnceLock::new()).collect();

Ok(Self {
layout,
ctx,
stats_table_op: RwLock::new(stats_table_op),
segments,
stats_table: Arc::new(OnceCell::new()),
chunk_readers: chunk_scans,
})
}

/// Get the stats table operation.
pub(crate) fn stats_table_op(&self) -> VortexResult<RwLockWriteGuard<StatsTableOp>> {
self.stats_table_op
.write()
.map_err(|_| vortex_err!("poisoned"))
/// Get or initialize the stats table.
///
/// Only the first successful caller will initialize the stats table, all other callers will
/// resolve to the same result.
pub(crate) async fn stats_table(&self) -> VortexResult<Option<&StatsTable>> {
self.stats_table
.get_or_try_init(async {
// The number of chunks
let mut nchunks = self.layout.nchildren();
if self.layout.metadata().is_some() {
// The final child is the statistics table.
nchunks -= 1;
}

Ok(match self.layout.metadata() {
None => None,
Some(metadata) => {
// Figure out which stats are present
let present_stats: Arc<[Stat]> =
Arc::from(stats_from_bitset_bytes(metadata.as_ref()));

let layout_dtype = self.layout.dtype().clone();
let stats_dtype =
StatsTable::dtype_for_stats_table(&layout_dtype, &present_stats);
let stats_layout = self
.layout
.child(self.layout.nchildren() - 1, stats_dtype)?;

let stats_array = stats_layout
.reader(self.segments.clone(), self.ctx.clone())?
.evaluate(
RowMask::new_valid_between(0, nchunks as u64),
Identity::new_expr(),
)
.await?;

Some(StatsTable::try_new(
layout_dtype.clone(),
stats_array,
present_stats.clone(),
)?)
}
})
})
.await
.map(|opt| opt.as_ref())
}

/// Return the number of chunks
@@ -99,13 +107,11 @@ impl ChunkedReader {
}

/// Return the child reader for the chunk.
pub(crate) fn child(&self, idx: usize) -> VortexResult<Arc<dyn LayoutReader>> {
self.chunk_readers[idx]
.get_or_try_init(|| {
let child_layout = self.layout.child(idx, self.layout.dtype().clone())?;
child_layout.reader(self.ctx.clone())
})
.cloned()
pub(crate) fn child(&self, idx: usize) -> VortexResult<&Arc<dyn LayoutReader>> {
self.chunk_readers[idx].get_or_try_init(|| {
let child_layout = self.layout.child(idx, self.layout.dtype().clone())?;
child_layout.reader(self.segments.clone(), self.ctx.clone())
})
}
}

@@ -114,7 +120,7 @@ impl LayoutReader for ChunkedReader {
&self.layout
}

fn create_evaluator(self: Arc<Self>, row_mask: RowMask, expr: ExprRef) -> VortexResult<EvalOp> {
Ok(ChunkedEvaluator::new(self, row_mask, expr).boxed())
fn evaluator(&self) -> &dyn AsyncEvaluator {
self
}
}
131 changes: 58 additions & 73 deletions vortex-layout/src/layouts/flat/evaluator.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,19 @@
use std::sync::Arc;

use vortex_array::compute::{filter, FilterMask};
use async_trait::async_trait;
use vortex_array::compute::filter;
use vortex_array::ArrayData;
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};
use vortex_error::{vortex_bail, vortex_err, VortexResult};
use vortex_expr::ExprRef;
use vortex_ipc::messages::{BufMessageReader, DecoderMessage};
use vortex_scan::{AsyncEvaluator, RowMask};

use crate::layouts::flat::reader::FlatReader;
use crate::operations::{Operation, Poll};
use crate::reader::LayoutScanExt;
use crate::segments::SegmentReader;

#[derive(Debug)]
pub(crate) struct FlatEvaluator {
reader: Arc<FlatReader>,
filter_mask: Option<FilterMask>,
expr: ExprRef,
}

impl FlatEvaluator {
pub(crate) fn new(reader: Arc<FlatReader>, filter_mask: FilterMask, expr: ExprRef) -> Self {
Self {
reader,
filter_mask: Some(filter_mask),
expr,
}
}
}

impl Operation for FlatEvaluator {
type Output = ArrayData;

fn poll(&mut self, segments: &dyn SegmentReader) -> VortexResult<Poll<Self::Output>> {
#[async_trait(?Send)]
impl AsyncEvaluator for FlatReader {
async fn evaluate(self: &Self, row_mask: RowMask, expr: ExprRef) -> VortexResult<ArrayData> {
// Grab the byte buffer for the segment.
let Some(bytes) = segments.get(self.reader.segment_id()) else {
return Ok(Poll::NeedMore(vec![self.reader.segment_id()]));
};
let bytes = self.segments().get(self.segment_id()).await?;

// Decode the ArrayParts from the message bytes.
// TODO(ngates): ArrayParts should probably live in vortex-array, and not required
@@ -45,75 +23,82 @@ impl Operation for FlatEvaluator {
.next()
.ok_or_else(|| vortex_err!("Flat message body missing"))??
{
parts.decode(self.reader.ctx(), self.reader.dtype().clone())
parts.decode(self.ctx(), self.dtype().clone())
} else {
vortex_bail!("Flat message is not ArrayParts")
}?;

// TODO(ngates): what's the best order to apply the filter mask / expression?
let array = self.expr.evaluate(&array)?;

// If we clone the filter mask, then it eagerly resolves indices. Instead, we use the
// same technique as futures map to ensure this operation can only be polled once.
let filter_mask = self
.filter_mask
.take()
.vortex_expect("FlatEvaluator polled multiple times");
let array = filter(&array, filter_mask)?;

Ok(Poll::Some(array))
let array = expr.evaluate(&array)?;
filter(&array, row_mask.into_filter_mask()?)
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use arrow_buffer::BooleanBuffer;
use futures::executor::block_on;
use vortex_array::array::PrimitiveArray;
use vortex_array::validity::Validity;
use vortex_array::{ArrayDType, IntoArrayVariant, ToArrayData};
use vortex_buffer::buffer;
use vortex_expr::{gt, lit, Identity};
use vortex_scan::RowMask;

use crate::layouts::flat::writer::FlatLayoutWriter;
use crate::segments::test::TestSegments;
use crate::strategies::LayoutWriterExt;

#[test]
fn flat_identity() {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let result = segments
.evaluate(
layout.reader(Default::default()).unwrap(),
Identity::new_expr(),
)
.into_primitive()
.unwrap();

assert_eq!(array.as_slice::<i32>(), result.as_slice::<i32>());
block_on(async {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let result = layout
.reader(Arc::new(segments), Default::default())
.unwrap()
.evaluate(
RowMask::new_valid_between(0, layout.row_count()),
Identity::new_expr(),
)
.await
.unwrap()
.into_primitive()
.unwrap();

assert_eq!(array.as_slice::<i32>(), result.as_slice::<i32>());
})
}

#[test]
fn flat_expr() {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let expr = gt(Identity::new_expr(), lit(3i32));
let result = segments
.evaluate(layout.reader(Default::default()).unwrap(), expr)
.into_bool()
.unwrap();

assert_eq!(
BooleanBuffer::from_iter([false, false, false, true, true]),
result.boolean_buffer()
);
block_on(async {
let mut segments = TestSegments::default();
let array = PrimitiveArray::new(buffer![1, 2, 3, 4, 5], Validity::AllValid);
let layout = FlatLayoutWriter::new(array.dtype().clone())
.push_one(&mut segments, array.to_array())
.unwrap();

let expr = gt(Identity::new_expr(), lit(3i32));
let result = layout
.reader(Arc::new(segments), Default::default())
.unwrap()
.evaluate(RowMask::new_valid_between(0, layout.row_count()), expr)
.await
.unwrap()
.into_bool()
.unwrap();

assert_eq!(
BooleanBuffer::from_iter([false, false, false, true, true]),
result.boolean_buffer()
);
})
}
}
10 changes: 8 additions & 2 deletions vortex-layout/src/layouts/flat/mod.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ use vortex_error::VortexResult;
use crate::encoding::{LayoutEncoding, LayoutId};
use crate::layouts::flat::reader::FlatReader;
use crate::reader::{LayoutReader, LayoutScanExt};
use crate::segments::AsyncSegmentReader;
use crate::{LayoutData, FLAT_LAYOUT_ID};

#[derive(Debug)]
@@ -22,8 +23,13 @@ impl LayoutEncoding for FlatLayout {
FLAT_LAYOUT_ID
}

fn reader(&self, layout: LayoutData, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader>> {
Ok(FlatReader::try_new(layout, ctx)?.into_arc())
fn reader(
&self,
layout: LayoutData,
ctx: ContextRef,
segments: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Arc<dyn LayoutReader>> {
Ok(FlatReader::try_new(layout, ctx, segments)?.into_arc())
}

fn register_splits(
28 changes: 17 additions & 11 deletions vortex-layout/src/layouts/flat/reader.rs
Original file line number Diff line number Diff line change
@@ -2,19 +2,17 @@ use std::sync::Arc;

use vortex_array::ContextRef;
use vortex_error::{vortex_err, vortex_panic, VortexResult};
use vortex_expr::ExprRef;
use vortex_scan::AsyncEvaluator;

use crate::layouts::flat::evaluator::FlatEvaluator;
use crate::layouts::flat::FlatLayout;
use crate::operations::OperationExt;
use crate::reader::{EvalOp, LayoutReader};
use crate::segments::SegmentId;
use crate::{LayoutData, LayoutEncoding, RowMask};
use crate::reader::LayoutReader;
use crate::segments::{AsyncSegmentReader, SegmentId};
use crate::{LayoutData, LayoutEncoding};

#[derive(Debug)]
pub struct FlatReader {
layout: LayoutData,
ctx: ContextRef,
segments: Arc<dyn AsyncSegmentReader>,
// The segment ID of the array in this FlatLayout.
// NOTE(ngates): we don't cache the ArrayData here since the cache lives for as long as the
// reader does, which means likely holding a strong reference to the array for much longer
@@ -23,7 +21,11 @@ pub struct FlatReader {
}

impl FlatReader {
pub(crate) fn try_new(layout: LayoutData, ctx: ContextRef) -> VortexResult<Self> {
pub(crate) fn try_new(
layout: LayoutData,
ctx: ContextRef,
segments: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Self> {
if layout.encoding().id() != FlatLayout.id() {
vortex_panic!("Mismatched layout ID")
}
@@ -35,6 +37,7 @@ impl FlatReader {
Ok(Self {
layout,
ctx,
segments,
segment_id,
})
}
@@ -43,6 +46,10 @@ impl FlatReader {
self.ctx.clone()
}

pub(crate) fn segments(&self) -> &dyn AsyncSegmentReader {
self.segments.as_ref()
}

pub(crate) fn segment_id(&self) -> SegmentId {
self.segment_id
}
@@ -53,8 +60,7 @@ impl LayoutReader for FlatReader {
&self.layout
}

fn create_evaluator(self: Arc<Self>, row_mask: RowMask, expr: ExprRef) -> VortexResult<EvalOp> {
let filter_mask = row_mask.into_filter_mask()?;
Ok(FlatEvaluator::new(self, filter_mask, expr).boxed())
fn evaluator(&self) -> &dyn AsyncEvaluator {
self
}
}
11 changes: 10 additions & 1 deletion vortex-layout/src/layouts/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ use crate::data::LayoutData;
use crate::encoding::{LayoutEncoding, LayoutId};
use crate::layouts::struct_::scan::StructScan;
use crate::reader::{LayoutReader, LayoutScanExt};
use crate::segments::AsyncSegmentReader;
use crate::COLUMNAR_LAYOUT_ID;

#[derive(Debug)]
@@ -21,7 +22,12 @@ impl LayoutEncoding for StructLayout {
COLUMNAR_LAYOUT_ID
}

fn reader(&self, layout: LayoutData, ctx: ContextRef) -> VortexResult<Arc<dyn LayoutReader>> {
fn reader(
&self,
layout: LayoutData,
ctx: ContextRef,
_segments: Arc<dyn AsyncSegmentReader>,
) -> VortexResult<Arc<dyn LayoutReader>> {
Ok(StructScan::try_new(layout, ctx)?.into_arc())
}

@@ -31,10 +37,13 @@ impl LayoutEncoding for StructLayout {
row_offset: u64,
splits: &mut BTreeSet<u64>,
) -> VortexResult<()> {
// Register the splits for each field
for child_idx in 0..layout.nchildren() {
let child = layout.child(child_idx, layout.dtype().clone())?;
child.register_splits(row_offset, splits)?;
}
// But also... (fun bug!), register the length of the struct in case there are no fields.
splits.insert(row_offset + layout.row_count());
Ok(())
}
}
40 changes: 13 additions & 27 deletions vortex-layout/src/layouts/struct_/scan.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::sync::Arc;

use async_trait::async_trait;
use vortex_array::{ArrayData, ContextRef};
use vortex_error::{vortex_panic, VortexResult};
use vortex_expr::ExprRef;
use vortex_scan::{AsyncEvaluator, RowMask};

use crate::layouts::struct_::StructLayout;
use crate::operations::{Operation, Poll};
use crate::reader::{EvalOp, LayoutReader};
use crate::segments::SegmentReader;
use crate::{LayoutData, LayoutEncoding, RowMask};
use crate::reader::LayoutReader;
use crate::{LayoutData, LayoutEncoding};

#[derive(Debug)]
pub struct StructScan {
@@ -27,31 +25,19 @@ impl StructScan {
}
}

impl LayoutReader for StructScan {
fn layout(&self) -> &LayoutData {
&self.layout
}

fn create_evaluator(
self: Arc<Self>,
_row_mask: RowMask,
_expr: ExprRef,
) -> VortexResult<EvalOp> {
#[async_trait(?Send)]
impl AsyncEvaluator for StructScan {
async fn evaluate(self: &Self, _row_mask: RowMask, _expr: ExprRef) -> VortexResult<ArrayData> {
todo!()
}
}

#[derive(Debug)]
#[allow(dead_code)]
struct StructScanner {
layout: LayoutData,
mask: RowMask,
}

impl Operation for StructScanner {
type Output = ArrayData;
impl LayoutReader for StructScan {
fn layout(&self) -> &LayoutData {
&self.layout
}

fn poll(&mut self, _segments: &dyn SegmentReader) -> VortexResult<Poll<Self::Output>> {
todo!()
fn evaluator(&self) -> &dyn AsyncEvaluator {
self
}
}
5 changes: 0 additions & 5 deletions vortex-layout/src/lib.rs
Original file line number Diff line number Diff line change
@@ -7,13 +7,8 @@ pub use context::*;
mod encoding;
pub mod layouts;
pub use encoding::*;
mod row_mask;
pub use row_mask::*;
pub mod operations;
mod reader;
pub use reader::*;
#[cfg(feature = "vortex-scan")]
mod scan;
pub mod segments;
pub mod strategies;

27 changes: 0 additions & 27 deletions vortex-layout/src/operations/cached.rs

This file was deleted.

24 changes: 0 additions & 24 deletions vortex-layout/src/operations/map.rs

This file was deleted.

95 changes: 0 additions & 95 deletions vortex-layout/src/operations/mod.rs

This file was deleted.

18 changes: 0 additions & 18 deletions vortex-layout/src/operations/resolved.rs

This file was deleted.

41 changes: 8 additions & 33 deletions vortex-layout/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,21 @@
use std::sync::Arc;

use vortex_array::ArrayData;
use vortex_dtype::DType;
use vortex_error::VortexResult;
use vortex_expr::ExprRef;
use vortex_scan::AsyncEvaluator;

use crate::operations::Operation;
use crate::{LayoutData, RowMask};

pub type EvalOp = Box<dyn Operation<Output = ArrayData>>;
use crate::LayoutData;

/// A [`LayoutReader`] is an instance of a [`LayoutData`] that can cache state across multiple
/// operations.
pub trait LayoutReader {
///
/// Since different row ranges of the reader may be evaluated by different threads, it is required
/// to be both `Send` and `Sync`.
pub trait LayoutReader: Send + Sync + AsyncEvaluator {
/// Returns the [`LayoutData`] of this reader.
fn layout(&self) -> &LayoutData;

/// Creates a new evaluator for the layout. It is expected that the evaluator makes use of
/// shared state from the [`LayoutReader`] for caching and other optimisations.
//
// NOTE(ngates): we have chosen a general "run this expression" API instead of separate
// `filter(row_mask, expr) -> row_mask` + `project(row_mask, field_mask)` APIs.
// The reason for this is so we can eventually support cell-level push-down.
// If we only projected using a field mask, then it means we need to download all the data
// for the rows of field present in the row mask. When I say cell-level push-down, I mean
// we can slice the cell directly out of storage using an API like
// `SegmentReader::read(segment_id, byte_range: Range<usize>)`. This is a highly advanced
// use-case, but can prove invaluable for large cell values such as images and video.
// If instead we make the projection API `project(row_mask, expr)`, then identical to the
// filter API and there's now no point having two. Hence: `evaluate(row_mask, expr)`.
fn create_evaluator(self: Arc<Self>, row_mask: RowMask, expr: ExprRef) -> VortexResult<EvalOp>;
/// Returns the [`AsyncEvaluator`] for this reader.
fn evaluator(&self) -> &dyn AsyncEvaluator;
}

pub trait LayoutScanExt: LayoutReader {
@@ -48,14 +34,3 @@ pub trait LayoutScanExt: LayoutReader {
}

impl<L: LayoutReader> LayoutScanExt for L {}

impl dyn LayoutReader + 'static {
/// Perform a scan over a row-range of the layout.
#[cfg(feature = "vortex-scan")]
pub fn range_scan(
self: Arc<dyn LayoutReader>,
range_scan: vortex_scan::RangeScan,
) -> impl Operation<Output = ArrayData> {
crate::scan::LayoutRangeScan::new(self, range_scan)
}
}
58 changes: 0 additions & 58 deletions vortex-layout/src/scan.rs

This file was deleted.

50 changes: 16 additions & 34 deletions vortex-layout/src/segments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::ops::Deref;

use async_trait::async_trait;
use bytes::Bytes;
use vortex_array::ArrayData;
use vortex_error::VortexResult;
use vortex_ipc::messages::{EncoderMessage, MessageEncoder};

/// The identifier for a single segment.
@@ -23,11 +25,12 @@ impl Deref for SegmentId {
}
}

pub trait SegmentReader {
#[async_trait]
pub trait AsyncSegmentReader: Send + Sync {
/// Attempt to get the data associated with a given segment ID.
///
/// If the segment ID is not found, `None` is returned.
fn get(&self, id: SegmentId) -> Option<Bytes>;
async fn get(&self, id: SegmentId) -> VortexResult<Bytes>;
}

pub trait SegmentWriter {
@@ -45,47 +48,16 @@ pub trait SegmentWriter {

#[cfg(test)]
pub mod test {
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use vortex_error::{vortex_panic, VortexExpect};
use vortex_expr::ExprRef;
use vortex_error::{vortex_err, VortexExpect};

use super::*;
use crate::operations::Poll;
use crate::reader::LayoutReader;
use crate::segments::SegmentReader;
use crate::RowMask;

#[derive(Default)]
pub struct TestSegments {
segments: Vec<Bytes>,
}

impl TestSegments {
pub fn evaluate(&self, reader: Arc<dyn LayoutReader>, expr: ExprRef) -> ArrayData {
let row_count = reader.layout().row_count();
let mut evaluator = reader
.create_evaluator(RowMask::new_valid_between(0, row_count), expr)
.vortex_expect("Failed to create scanner");
match evaluator
.poll(self)
.vortex_expect("Failed to poll evaluator")
{
Poll::Some(array) => array,
Poll::NeedMore(_segments) => {
vortex_panic!("Layout requested more segments from TestSegments.")
}
}
}
}

impl SegmentReader for TestSegments {
fn get(&self, id: SegmentId) -> Option<Bytes> {
self.segments.get(*id as usize).cloned()
}
}

impl SegmentWriter for TestSegments {
fn put(&mut self, data: Vec<Bytes>) -> SegmentId {
let id = u32::try_from(self.segments.len())
@@ -98,4 +70,14 @@ pub mod test {
id.into()
}
}

#[async_trait]
impl AsyncSegmentReader for TestSegments {
async fn get(&self, id: SegmentId) -> VortexResult<Bytes> {
self.segments
.get(*id as usize)
.cloned()
.ok_or_else(|| vortex_err!("Segment not found"))
}
}
}
5 changes: 5 additions & 0 deletions vortex-scan/Cargo.toml
Original file line number Diff line number Diff line change
@@ -14,10 +14,15 @@ readme.workspace = true
categories.workspace = true

[dependencies]
async-trait = { workspace = true }
vortex-array = { workspace = true }
vortex-buffer = { workspace = true }
vortex-dtype = { workspace = true }
vortex-error = { workspace = true }
vortex-expr = { workspace = true }

[dev-dependencies]
rstest = { workspace = true }

[lints]
workspace = true
38 changes: 38 additions & 0 deletions vortex-scan/src/evaluator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// ## A note on the API of Evaluator.
//
// We have chosen a general "run this expression" API instead of separate
// `filter(row_mask, expr) -> row_mask` + `project(row_mask, field_mask)` APIs. The reason for
// this is so we can eventually support cell-level push-down.
//
// If we only projected using a field mask, then it means we need to download all the data
// for the rows of field present in the row mask. When I say cell-level push-down, I mean
// we can slice the cell directly out of storage using an API like
// `SegmentReader::read(segment_id, byte_range: Range<usize>)`.
//
// Admittedly, this is a highly advanced use-case, but can prove invaluable for large cell values
// such as images and video.
//
// If instead we make the projection API `project(row_mask, expr)`, then the project API is
// identical to the filter API and there's no point having both. Hence, a single
// `evaluate(row_mask, expr)` API.
use async_trait::async_trait;
use vortex_array::ArrayData;
use vortex_error::VortexResult;
use vortex_expr::ExprRef;

use crate::RowMask;

pub trait Evaluator {
fn evaluate(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult<ArrayData>;
}

/// An async evaluator that can evaluate expressions against a row mask.
///
/// For now, we make this a non-Send trait since it's desirable for us to pin this CPU-heavy
/// work to a single thread. Having a `Send` future doesn't prevent this model, but it makes
/// it easy to accidentally spawn this on e.g. a multithreaded Tokio runtime that would cause
/// thrashing of the CPU cache.
#[async_trait(?Send)]
pub trait AsyncEvaluator {
async fn evaluate(&self, row_mask: RowMask, expr: ExprRef) -> VortexResult<ArrayData>;
}
4 changes: 4 additions & 0 deletions vortex-scan/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
mod evaluator;
mod range_scan;
mod row_mask;

use std::ops::Range;
use std::sync::Arc;

pub use evaluator::*;
pub use range_scan::*;
pub use row_mask::*;
use vortex_array::compute::FilterMask;
use vortex_array::{ArrayDType, Canonical, IntoArrayData};
use vortex_dtype::DType;
48 changes: 41 additions & 7 deletions vortex-scan/src/range_scan.rs
Original file line number Diff line number Diff line change
@@ -6,7 +6,8 @@ use vortex_array::{ArrayData, IntoArrayVariant};
use vortex_error::VortexResult;
use vortex_expr::ExprRef;

use crate::Scan;
use crate::evaluator::{AsyncEvaluator, Evaluator};
use crate::{RowMask, Scan};

pub struct RangeScan {
scan: Arc<Scan>,
@@ -32,6 +33,11 @@ pub enum NextOp {
Eval((Range<u64>, FilterMask, ExprRef)),
}

/// We implement the range scan via polling for the next operation to perform, and then posting
/// the result back to the range scan to make progress.
///
/// This allows us to minimize the amount of logic we duplicate in order to support both
/// synchronous and asynchronous evaluation.
impl RangeScan {
pub(crate) fn new(scan: Arc<Scan>, row_offset: u64, mask: FilterMask) -> Self {
let state = scan
@@ -57,12 +63,9 @@ impl RangeScan {
}
}

/// The caller polls for the next expression they need to evaluate.
/// Once they have evaluated the expression, they must post the result back to the range scan
/// to make progress.
///
/// Check for the next operation to perform.
/// Returns `Poll::Ready` when the scan is complete.
pub fn next(&self) -> NextOp {
fn next(&self) -> NextOp {
match &self.state {
State::FilterEval((mask, expr)) => {
NextOp::Eval((self.row_range.clone(), mask.clone(), expr.clone()))
@@ -75,7 +78,7 @@ impl RangeScan {
}

/// Post the result of the last expression evaluation back to the range scan.
pub fn post(&mut self, result: ArrayData) -> VortexResult<()> {
fn post(&mut self, result: ArrayData) -> VortexResult<()> {
match &self.state {
State::FilterEval(_) => {
// Intersect the result of the filter expression with our initial row mask.
@@ -93,4 +96,35 @@ impl RangeScan {
}
Ok(())
}

/// Evaluate the [`RangeScan`] operation using a synchronous expression evaluator.
pub fn evaluate(mut self, evaluator: &dyn Evaluator) -> VortexResult<ArrayData> {
loop {
match self.next() {
NextOp::Ready(array) => return Ok(array),
NextOp::Eval((row_range, mask, expr)) => {
self.post(evaluator.evaluate(RowMask::new(mask, row_range.start), expr)?)?;
}
}
}
}

/// Evaluate the [`RangeScan`] operation using an async expression evaluator.
pub async fn evaluate_async(
mut self,
evaluator: &dyn AsyncEvaluator,
) -> VortexResult<ArrayData> {
loop {
match self.next() {
NextOp::Ready(array) => return Ok(array),
NextOp::Eval((row_range, mask, expr)) => {
self.post(
evaluator
.evaluate(RowMask::new(mask, row_range.start), expr)
.await?,
)?;
}
}
}
}
}
107 changes: 39 additions & 68 deletions vortex-layout/src/row_mask.rs → vortex-scan/src/row_mask.rs
Original file line number Diff line number Diff line change
@@ -2,16 +2,15 @@ use std::cmp::{max, min};
use std::fmt::{Display, Formatter};
use std::ops::RangeBounds;

use arrow_buffer::BooleanBuffer;
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{BoolArray, PrimitiveArray, SparseArray};
use vortex_array::array::{BoolArray, BooleanBuffer, PrimitiveArray, SparseArray};
use vortex_array::compute::{and, filter, slice, try_cast, FilterMask};
use vortex_array::validity::{ArrayValidity, LogicalValidity, Validity};
use vortex_array::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_buffer::Buffer;
use vortex_dtype::Nullability::NonNullable;
use vortex_dtype::{DType, PType};
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult, VortexUnwrap};
use vortex_error::{vortex_bail, vortex_err, VortexExpect, VortexResult};

/// A RowMask captures a set of selected rows within a range.
///
@@ -39,16 +38,9 @@ impl Display for RowMask {
}

impl RowMask {
pub fn try_new(mask: FilterMask, begin: u64, end: u64) -> VortexResult<Self> {
if mask.len() as u64 != (end - begin) {
vortex_bail!(
"FilterMask must be the same length {} as the given range {}..{}",
mask.len(),
begin,
end
);
}
Ok(Self { mask, begin, end })
pub fn new(mask: FilterMask, begin: u64) -> Self {
let end = begin + (mask.len() as u64);
Self { mask, begin, end }
}

/// Construct a RowMask which is valid in the given range.
@@ -59,28 +51,22 @@ impl RowMask {
pub fn new_valid_between(begin: u64, end: u64) -> Self {
let length =
usize::try_from(end - begin).vortex_expect("Range length does not fit into a usize");
RowMask::try_new(FilterMask::from(BooleanBuffer::new_set(length)), begin, end)
.vortex_unwrap()
RowMask::new(FilterMask::from(BooleanBuffer::new_set(length)), begin)
}

/// Construct a RowMask which is invalid everywhere in the given range.
pub fn new_invalid_between(begin: u64, end: u64) -> Self {
let length =
usize::try_from(end - begin).vortex_expect("Range length does not fit into a usize");
RowMask::try_new(
FilterMask::from(BooleanBuffer::new_unset(length)),
begin,
end,
)
.vortex_unwrap()
RowMask::new(FilterMask::from(BooleanBuffer::new_unset(length)), begin)
}

/// Creates a RowMask from an array, only supported boolean and integer types.
pub fn from_array(array: &ArrayData, begin: u64, end: u64) -> VortexResult<Self> {
if array.dtype().is_int() {
Self::from_index_array(array, begin, end)
} else if array.dtype().is_boolean() {
Self::from_mask_array(array, begin, end)
Self::from_mask_array(array, begin)
} else {
vortex_bail!(
"RowMask can only be created from integer or boolean arrays, got {} instead.",
@@ -92,15 +78,17 @@ impl RowMask {
/// Construct a RowMask from a Boolean typed array.
///
/// True-valued positions are kept by the returned mask.
fn from_mask_array(array: &ArrayData, begin: u64, end: u64) -> VortexResult<Self> {
fn from_mask_array(array: &ArrayData, begin: u64) -> VortexResult<Self> {
match array.logical_validity() {
LogicalValidity::AllValid(_) => {
Self::try_new(FilterMask::try_from(array.clone())?, begin, end)
Ok(Self::new(FilterMask::try_from(array.clone())?, begin))
}
LogicalValidity::AllInvalid(_) => {
Ok(Self::new_invalid_between(begin, begin + array.len() as u64))
}
LogicalValidity::AllInvalid(_) => Ok(Self::new_invalid_between(begin, end)),
LogicalValidity::Array(validity) => {
let bitmask = and(array.clone(), validity)?;
Self::try_new(FilterMask::try_from(bitmask)?, begin, end)
Ok(Self::new(FilterMask::try_from(bitmask)?, begin))
}
}
}
@@ -121,7 +109,7 @@ impl RowMask {
indices.as_slice::<u64>().iter().map(|i| *i as usize),
);

RowMask::try_new(mask, begin, end)
Ok(RowMask::new(mask, begin))
}

/// Whether the mask is disjoint with the given range.
@@ -161,14 +149,14 @@ impl RowMask {
self.mask.len()
);
}
Self::from_mask_array(&bitmask, self.begin, self.end)
Self::from_mask_array(&bitmask, self.begin)
} else {
// TODO(robert): Avoid densifying sparse values just to get true indices
let sparse_mask =
SparseArray::try_new(self.to_indices_array()?, bitmask, self.len(), false.into())?
.into_array()
.into_bool()?;
Self::from_mask_array(sparse_mask.as_ref(), self.begin(), self.end())
Self::from_mask_array(sparse_mask.as_ref(), self.begin())
}
}

@@ -183,11 +171,7 @@ impl RowMask {
let other_buffer = other.mask.to_boolean_buffer()?;

let unified = &this_buffer & (&other_buffer);
return RowMask::from_mask_array(
BoolArray::from(unified).as_ref(),
self.begin,
self.end,
);
return RowMask::from_mask_array(BoolArray::from(unified).as_ref(), self.begin);
}

// Disjoint row ranges
@@ -224,7 +208,7 @@ impl RowMask {
}),
);

Self::try_new(output_mask, output_begin, output_end)
Ok(Self::new(output_mask, output_begin))
}

pub fn is_all_false(&self) -> bool {
@@ -251,7 +235,7 @@ impl RowMask {
pub fn slice(&self, begin: u64, end: u64) -> VortexResult<Self> {
let range_begin = max(self.begin, begin);
let range_end = min(self.end, end);
RowMask::try_new(
Ok(RowMask::new(
if range_begin == self.begin && range_end == self.end {
self.mask.clone()
} else {
@@ -265,8 +249,7 @@ impl RowMask {
)
},
range_begin,
range_end,
)
))
}

/// Filter array with this `RowMask`.
@@ -322,7 +305,7 @@ impl RowMask {
self.begin
)
}
RowMask::try_new(self.mask, self.begin - offset, self.end - offset)
Ok(RowMask::new(self.mask, self.begin - offset))
}

// Get the true count of the underlying mask.
@@ -339,7 +322,6 @@ impl RowMask {

#[cfg(test)]
mod tests {
use arrow_buffer::BooleanBuffer;
use rstest::rstest;
use vortex_array::array::PrimitiveArray;
use vortex_array::compute::FilterMask;
@@ -352,43 +334,35 @@ mod tests {

#[rstest]
#[case(
RowMask::try_new(FilterMask::from_iter([true, true, true, false, false, false, false, false, true, true]), 0, 10).unwrap(), (0, 1),
RowMask::try_new(FilterMask::from_iter([true]), 0, 1).unwrap())]
RowMask::new(FilterMask::from_iter([true, true, true, false, false, false, false, false, true, true]), 0), (0, 1),
RowMask::new(FilterMask::from_iter([true]), 0))]
#[case(
RowMask::try_new(FilterMask::from_iter([false, false, false, false, false, true, true, true, true, true]), 0, 10).unwrap(), (2, 5),
RowMask::try_new(FilterMask::from_iter([false, false, false]), 2, 5).unwrap()
RowMask::new(FilterMask::from_iter([false, false, false, false, false, true, true, true, true, true]), 0), (2, 5),
RowMask::new(FilterMask::from_iter([false, false, false]), 2)
)]
#[case(
RowMask::try_new(FilterMask::from_iter([true, true, true, true, false, false, false, false, false, false]), 0, 10).unwrap(), (2, 5),
RowMask::try_new(FilterMask::from_iter([true, true, false]), 2, 5).unwrap()
RowMask::new(FilterMask::from_iter([true, true, true, true, false, false, false, false, false, false]), 0), (2, 5),
RowMask::new(FilterMask::from_iter([true, true, false]), 2)
)]
#[case(
RowMask::try_new(FilterMask::from_iter([true, true, true, false, false, true, true, false, false, false]), 0, 10).unwrap(), (2, 6),
RowMask::try_new(FilterMask::from_iter([true, false, false, true]), 2, 6).unwrap())]
RowMask::new(FilterMask::from_iter([true, true, true, false, false, true, true, false, false, false]), 0), (2, 6),
RowMask::new(FilterMask::from_iter([true, false, false, true]), 2))]
#[case(
RowMask::try_new(FilterMask::from_iter([false, false, false, false, false, true, true, true, true, true]), 0, 10).unwrap(), (7, 11),
RowMask::try_new(FilterMask::from_iter([true, true, true]), 7, 10).unwrap())]
RowMask::new(FilterMask::from_iter([false, false, false, false, false, true, true, true, true, true]), 0), (7, 11),
RowMask::new(FilterMask::from_iter([true, true, true]), 7))]
#[case(
RowMask::try_new(FilterMask::from_iter([false, true, true, true, true, true]), 3, 9).unwrap(), (0, 5),
RowMask::try_new(FilterMask::from_iter([false, true]), 3, 5).unwrap())]
RowMask::new(FilterMask::from_iter([false, true, true, true, true, true]), 3), (0, 5),
RowMask::new(FilterMask::from_iter([false, true]), 3))]
#[cfg_attr(miri, ignore)]
fn slice(#[case] first: RowMask, #[case] range: (u64, u64), #[case] expected: RowMask) {
assert_eq!(first.slice(range.0, range.1).vortex_unwrap(), expected);
}

#[test]
#[should_panic]
#[cfg_attr(miri, ignore)]
fn test_new() {
RowMask::try_new(FilterMask::from(BooleanBuffer::new_unset(10)), 5, 10).unwrap();
}

#[test]
#[should_panic]
#[cfg_attr(miri, ignore)]
fn shift_invalid() {
RowMask::try_new(FilterMask::from_iter([true, true, true, true, true]), 5, 10)
.unwrap()
RowMask::new(FilterMask::from_iter([true, true, true, true, true]), 5)
.shift(7)
.unwrap();
}
@@ -397,25 +371,22 @@ mod tests {
#[cfg_attr(miri, ignore)]
fn shift() {
assert_eq!(
RowMask::try_new(FilterMask::from_iter([true, true, true, true, true]), 5, 10)
.unwrap()
RowMask::new(FilterMask::from_iter([true, true, true, true, true]), 5)
.shift(5)
.unwrap(),
RowMask::try_new(FilterMask::from_iter([true, true, true, true, true]), 0, 5).unwrap()
RowMask::new(FilterMask::from_iter([true, true, true, true, true]), 0)
);
}

#[test]
#[cfg_attr(miri, ignore)]
fn filter_array() {
let mask = RowMask::try_new(
let mask = RowMask::new(
FilterMask::from_iter([
false, false, false, false, false, true, true, true, true, true,
]),
0,
10,
)
.unwrap();
);
let array = Buffer::from_iter(0..20).into_array();
let filtered = mask.filter_array(array).unwrap().unwrap();
assert_eq!(

0 comments on commit 3fc82f3

Please sign in to comment.