Skip to content

Commit

Permalink
Vortex Layouts - Drivers (#1914)
Browse files Browse the repository at this point in the history
Introduces the idea of a `driver` as a pluggable way of orchestrating
I/O and CPU based work.

An `ExecDriver` takes a `Stream<Future<T>>` and returns a `Stream<T>`.
The abstraction means we can implement drivers that use the current
runtime thread (Inline), spawn the work onto an explicit Runtime, or
spawn the work onto a thread pool.

The `IoDriver` takes a stream of segment requests and has to resolve
them. For now, we have a FileIoDriver that assumes the bytes are laid
out on disk (and therefore coalescing is desirable), so it pulls
requests off the queue, coalesces them, then launches concurrent I/O
requests to resolve them.

The VortexFile remains generic over the I/O driver, meaning the
send-ness of the resulting ArrayStream is inferred based on the
send-ness of the I/O driver (in the case of FileDriver, that is the
send-ness of the VortexReadAt).

Part of #1676
  • Loading branch information
gatesn authored Jan 13, 2025
1 parent 010839b commit d658e5f
Show file tree
Hide file tree
Showing 22 changed files with 559 additions and 213 deletions.
1 change: 1 addition & 0 deletions vortex-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ async-trait = { workspace = true }
bytes = { workspace = true }
flatbuffers = { workspace = true }
futures = { workspace = true, features = ["std"] }
# TODO(ngates): remove these in favor of futures
futures-executor = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
Expand Down
19 changes: 19 additions & 0 deletions vortex-file/src/v2/exec/inline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use futures_util::future::BoxFuture;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use vortex_array::ArrayData;
use vortex_error::VortexResult;

use crate::v2::exec::ExecDriver;

/// An execution driver that awaits the futures inline using the caller's runtime.
pub struct InlineDriver;

impl ExecDriver for InlineDriver {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
) -> BoxStream<'static, VortexResult<ArrayData>> {
stream.then(|future| future).boxed()
}
}
23 changes: 23 additions & 0 deletions vortex-file/src/v2/exec/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
pub mod inline;
pub mod tokio;

use futures_util::future::BoxFuture;
use futures_util::stream::BoxStream;
use vortex_array::ArrayData;
use vortex_error::VortexResult;

/// An execution driver is used to drive the execution of the scan operation.
///
/// It is passed a stream of futures that (typically) process a single split of the file.
/// Drivers are able to control the concurrency of the execution with [`futures::stream::buffered`],
/// as well as _where_ the futures are executed by spawning them onto a specific runtime or thread
/// pool.
///
/// Note that the futures encapsulate heavy CPU code such as filtering and decompression. To
/// offload keep I/O work separate, please see the [`crate::v2::io::IoDriver`] trait.
pub trait ExecDriver {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
) -> BoxStream<'static, VortexResult<ArrayData>>;
}
40 changes: 40 additions & 0 deletions vortex-file/src/v2/exec/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use futures_util::future::BoxFuture;
use futures_util::stream::BoxStream;
use futures_util::StreamExt;
use tokio::runtime::Handle;
use vortex_array::ArrayData;
use vortex_error::{vortex_err, VortexResult};

use crate::v2::exec::ExecDriver;

/// An execution driver that spawns the futures onto a Tokio runtime.
pub struct TokioDriver {
handle: Handle,
concurrency: usize,
}

impl TokioDriver {
pub fn new(handle: Handle, concurrency: usize) -> Self {
Self {
handle,
concurrency,
}
}
}

impl ExecDriver for TokioDriver {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
) -> BoxStream<'static, VortexResult<ArrayData>> {
let handle = self.handle.clone();
stream
.map(move |future| handle.spawn(future))
.buffered(self.concurrency)
.map(|result| match result {
Ok(result) => result,
Err(e) => Err(vortex_err!("Failed to join Tokio result {}", e)),
})
.boxed()
}
}
174 changes: 85 additions & 89 deletions vortex-file/src/v2/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,35 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures::channel::oneshot;
use futures::Stream;
use futures_executor::block_on;
use futures_util::{stream, StreamExt, TryStreamExt};
use futures_util::{stream, FutureExt, StreamExt, TryStreamExt};
use pin_project_lite::pin_project;
use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
use vortex_array::{ArrayData, ContextRef};
use vortex_dtype::DType;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_io::VortexReadAt;
use vortex_error::VortexResult;
use vortex_layout::{ExprEvaluator, LayoutData, LayoutReader};
use vortex_scan::Scan;

use crate::v2::segments::cache::SegmentCache;
use crate::v2::exec::ExecDriver;
use crate::v2::io::IoDriver;
use crate::v2::segments::channel::SegmentChannel;

pub struct VortexFile<R> {
/// A Vortex file ready for reading.
///
/// It is generic over the `IoDriver` implementation enabling us to swap out the I/O subsystem for
/// particular environments. For example, memory mapped files vs object-store. By remaining generic,
/// it allows us to support both `Send` and `?Send` I/O drivers.
pub struct VortexFile<I> {
pub(crate) ctx: ContextRef,
pub(crate) layout: LayoutData,
pub(crate) segments: SegmentCache<R>,
pub(crate) io_driver: I,
pub(crate) exec_driver: Arc<dyn ExecDriver>,
pub(crate) splits: Arc<[Range<u64>]>,
pub(crate) thread_pool: Arc<rayon::ThreadPool>,
}

impl<R> VortexFile<R> {}

/// Async implementation of Vortex File.
impl<R: VortexReadAt + Unpin> VortexFile<R> {
impl<I: IoDriver> VortexFile<I> {
/// Returns the number of rows in the file.
pub fn row_count(&self) -> u64 {
self.layout.row_count()
Expand All @@ -41,94 +43,46 @@ impl<R: VortexReadAt + Unpin> VortexFile<R> {
}

/// Performs a scan operation over the file.
pub fn scan(self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + 'static> {
// Create a shared reader for the scan.
let reader: Arc<dyn LayoutReader> = self
.layout
.reader(self.segments.reader(), self.ctx.clone())?;
pub fn scan(&self, scan: Arc<Scan>) -> VortexResult<impl ArrayStream + 'static + use<'_, I>> {
let result_dtype = scan.result_dtype(self.dtype())?;

// For each row-group, we set up a future that will evaluate the scan and post its.
let row_group_driver = stream::iter(ArcIter::new(self.splits.clone()))
.map(move |row_range| {
let (send, recv) = oneshot::channel();
let reader = reader.clone();
let range_scan = scan.clone().range_scan(row_range);

// Launch the scan task onto the thread pool.
self.thread_pool.spawn_fifo(move || {
let array_result =
range_scan.and_then(|range_scan| {
block_on(range_scan.evaluate_async(|row_mask, expr| {
reader.evaluate_expr(row_mask, expr)
}))
});
// Post the result back to the main thread
send.send(array_result)
.map_err(|_| vortex_err!("send failed, recv dropped"))
.vortex_expect("send_failed, recv dropped");
});

recv
// Set up a segment channel to collect segment requests from the execution stream.
let segment_channel = SegmentChannel::new();

// Now we give one end of the channel to the layout reader...
let reader: Arc<dyn LayoutReader> = self
.layout
.reader(segment_channel.reader(), self.ctx.clone())?;
let exec_stream = stream::iter(ArcIter::new(self.splits.clone()))
.map(move |row_range| scan.clone().range_scan(row_range))
.map(move |range_scan| match range_scan {
Ok(range_scan) => {
let reader = reader.clone();
async move {
range_scan
.evaluate_async(|row_mask, expr| reader.evaluate_expr(row_mask, expr))
.await
}
.boxed()
}
Err(e) => futures::future::ready(Err(e)).boxed(),
})
.then(|recv| async move {
recv.await
.unwrap_or_else(|_cancelled| Err(vortex_err!("recv failed, send dropped")))
});
// TODO(ngates): we should call buffered(n) on this stream so that is launches multiple
// splits to run in parallel. Currently we use block_on, so there's no point this being
// any higher than the size of the thread pool. If we switch to running LocalExecutor,
// then there may be some value in slightly over-subscribing.

// Set up an I/O driver that will make progress on 32 I/O requests at a time.
// TODO(ngates): we should probably have segments hold an Arc'd driver stream internally
// so that multiple scans can poll it, while still sharing the same global concurrency
// limit?
let io_driver = self.segments.driver().buffered(32);
.boxed();
let exec_stream = self.exec_driver.drive(exec_stream);

// ...and the other end to the segment driver.
let io_stream = self.io_driver.drive(segment_channel.into_stream());

Ok(ArrayStreamAdapter::new(
result_dtype,
ScanDriver {
row_group_driver,
io_driver,
UnifiedDriverStream {
exec_stream,
io_stream,
},
))
}
}

pin_project! {
struct ScanDriver<R, S> {
#[pin]
row_group_driver: R,
#[pin]
io_driver: S,
}
}

impl<R, S> Stream for ScanDriver<R, S>
where
R: Stream<Item = VortexResult<ArrayData>>,
S: Stream<Item = VortexResult<()>>,
{
type Item = VortexResult<ArrayData>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
// If the row group driver is ready, then we can return the result.
if let Poll::Ready(r) = this.row_group_driver.try_poll_next_unpin(cx) {
return Poll::Ready(r);
}
// Otherwise, we try to poll the I/O driver.
// If the I/O driver is not ready, then we return Pending and wait for I/
// to wake up the driver.
if matches!(this.io_driver.as_mut().poll_next(cx), Poll::Pending) {
return Poll::Pending;
}
}
}
}

/// There is no `IntoIterator` for `Arc<[T]>` so to avoid copying into a Vec<T>, we define our own.
/// See <https://users.rust-lang.org/t/arc-to-owning-iterator/115190/11>.
struct ArcIter<T> {
Expand All @@ -153,3 +107,45 @@ impl<T: Clone> Iterator for ArcIter<T> {
})
}
}

pin_project! {
/// A [`Stream`] that drives the both the I/O stream and the execution stream concurrently.
///
/// This is sort of like a `select!` implementation, but not quite.
///
/// We can't use `futures::stream::select` because it requires both streams to terminate, and
/// our I/O stream will never terminate.
///
/// We can't use `futures::stream::zip` because it waits for boths streams to emit an item,
/// but our execution stream may require multiple I/O operations to complete before it can
/// return an item.
struct UnifiedDriverStream<R, S> {
#[pin]
exec_stream: R,
#[pin]
io_stream: S,
}
}

impl<R, S> Stream for UnifiedDriverStream<R, S>
where
R: Stream<Item = VortexResult<ArrayData>>,
S: Stream<Item = VortexResult<()>>,
{
type Item = VortexResult<ArrayData>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
// If the exec stream is ready, then we can return the result.
if let Poll::Ready(r) = this.exec_stream.try_poll_next_unpin(cx) {
return Poll::Ready(r);
}
// Otherwise, we try to poll the I/O stream.
// If the I/O stream is not ready, then we return Pending and wait for the next wakeup.
if matches!(this.io_stream.as_mut().poll_next(cx), Poll::Pending) {
return Poll::Pending;
}
}
}
}
2 changes: 1 addition & 1 deletion vortex-file/src/v2/footer/file_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::v2::footer::segment::Segment;

/// Captures the layout information of a Vortex file.
#[derive(Clone)]
pub(crate) struct FileLayout {
pub struct FileLayout {
pub(crate) root_layout: LayoutData,
pub(crate) segments: Arc<[Segment]>,
}
Expand Down
Loading

0 comments on commit d658e5f

Please sign in to comment.