Skip to content

Commit

Permalink
Vortex Layouts DataFusion Statistics (#1967)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Jan 16, 2025
1 parent 89a510a commit b09bac8
Show file tree
Hide file tree
Showing 24 changed files with 323 additions and 155 deletions.
2 changes: 1 addition & 1 deletion bench-vortex/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use vortex::file::VORTEX_FILE_EXTENSION;
use vortex::sampling_compressor::SamplingCompressor;
use vortex::variants::StructArrayTrait;
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_datafusion::persistent::format::VortexFormat;
use vortex_datafusion::persistent::VortexFormat;

use crate::{idempotent_async, CTX};

Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use vortex::sampling_compressor::SamplingCompressor;
use vortex::variants::StructArrayTrait;
use vortex::{ArrayDType, ArrayData, IntoArrayData, IntoArrayVariant};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::persistent::format::VortexFormat;
use vortex_datafusion::persistent::VortexFormat;
use vortex_datafusion::SessionContextExt;

use crate::{idempotent_async, Format, CTX, TARGET_BLOCK_BYTESIZE, TARGET_BLOCK_SIZE};
Expand Down
7 changes: 7 additions & 0 deletions vortex-array/src/stats/statsset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ impl StatsSet {
Self { values }
}

/// Create a new, empty StatsSet.
///
/// If you are planning to add stats to the set, consider using [StatsSet::default] instead.
pub fn empty() -> Self {
Self { values: vec![] }
}

/// Specialized constructor for the case where the StatsSet represents
/// an array consisting entirely of [null](vortex_dtype::DType::Null) values.
pub fn nulls(len: usize, dtype: &DType) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/examples/vortex_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use vortex_array::array::{ChunkedArray, StructArray, VarBinArray};
use vortex_array::validity::Validity;
use vortex_array::IntoArrayData;
use vortex_buffer::buffer;
use vortex_datafusion::persistent::format::VortexFormat;
use vortex_datafusion::persistent::VortexFormat;
use vortex_error::vortex_err;
use vortex_file::VortexFileWriter;

Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/memory/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::memory::stream::VortexRecordBatchStream;

/// Physical plan node for scans against an in-memory, possibly chunked Vortex Array.
#[derive(Clone)]
pub struct VortexScanExec {
pub(crate) struct VortexScanExec {
array: ChunkedArray,
scan_projection: FieldNames,
plan_properties: PlanProperties,
Expand Down
2 changes: 1 addition & 1 deletion vortex-datafusion/src/memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ mod provider;
mod statistics;
mod stream;

pub use provider::*;
pub use provider::{VortexMemTable, VortexMemTableOptions};
2 changes: 1 addition & 1 deletion vortex-datafusion/src/memory/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use vortex_array::ArrayLen;
use vortex_dtype::FieldNames;
use vortex_error::{vortex_err, VortexExpect, VortexResult};

pub fn chunked_array_df_stats(
pub(crate) fn chunked_array_df_stats(
array: &ChunkedArray,
projection: FieldNames,
) -> DFResult<Statistics> {
Expand Down
4 changes: 2 additions & 2 deletions vortex-datafusion/src/persistent/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use vortex_file::v2::{FileLayout, VortexOpenOptions};
use vortex_io::ObjectStoreReadAt;

#[derive(Debug, Clone)]
pub struct FileLayoutCache {
pub(crate) struct FileLayoutCache {
inner: Cache<Key, FileLayout>,
}

#[derive(Hash, Eq, PartialEq, Debug)]
pub struct Key {
pub(crate) struct Key {
location: Path,
m_time: DateTime<Utc>,
}
Expand Down
18 changes: 1 addition & 17 deletions vortex-datafusion/src/persistent/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use chrono::TimeZone as _;
use datafusion::datasource::listing::PartitionedFile;
use object_store::path::Path;
use object_store::ObjectMeta;

#[derive(Debug, Clone)]
pub struct VortexFile {
pub(crate) struct VortexFile {
pub(crate) object_meta: ObjectMeta,
}

Expand All @@ -13,17 +11,3 @@ impl From<VortexFile> for PartitionedFile {
PartitionedFile::new(value.object_meta.location, value.object_meta.size as u64)
}
}

impl VortexFile {
pub fn new(path: impl Into<String>, size: u64) -> Self {
Self {
object_meta: ObjectMeta {
location: Path::from(path.into()),
last_modified: chrono::Utc.timestamp_nanos(0),
size: size as usize,
e_tag: None,
version: None,
},
}
}
}
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::cache::FileLayoutCache;
use crate::persistent::opener::VortexFileOpener;

#[derive(Debug, Clone)]
pub struct VortexExec {
pub(crate) struct VortexExec {
file_scan_config: FileScanConfig,
metrics: ExecutionPlanMetricsSet,
predicate: Option<Arc<dyn PhysicalExpr>>,
Expand Down
91 changes: 83 additions & 8 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,24 @@ use datafusion::datasource::file_format::{FileFormat, FilePushdownSupport};
use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use datafusion::execution::SessionState;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{not_impl_err, DataFusionError, Result as DFResult, Statistics};
use datafusion_common::stats::Precision;
use datafusion_common::{
not_impl_err, ColumnStatistics, DataFusionError, Result as DFResult, ScalarValue, Statistics,
};
use datafusion_expr::Expr;
use datafusion_physical_expr::{LexRequirement, PhysicalExpr};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::ExecutionPlan;
use futures::{stream, StreamExt as _, TryStreamExt as _};
use object_store::{ObjectMeta, ObjectStore};
use vortex_array::arrow::infer_schema;
use vortex_array::stats::Stat;
use vortex_array::ContextRef;
use vortex_error::VortexResult;
use vortex_dtype::FieldPath;
use vortex_error::{vortex_err, VortexExpect, VortexResult};
use vortex_file::v2::VortexOpenOptions;
use vortex_file::VORTEX_FILE_EXTENSION;
use vortex_io::ObjectStoreReadAt;

use super::cache::FileLayoutCache;
use super::execution::VortexExec;
Expand Down Expand Up @@ -119,14 +126,82 @@ impl FileFormat for VortexFormat {
async fn infer_stats(
&self,
_state: &SessionState,
_store: &Arc<dyn ObjectStore>,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
_object: &ObjectMeta,
object: &ObjectMeta,
) -> DFResult<Statistics> {
// TODO(ngates): we should decide if it's worth returning file statistics. Since this
// call doesn't have projection information, I think it's better to wait until we can
// return per-partition statistics from VortexExpr ExecutionPlan node.
Ok(Statistics::new_unknown(table_schema.as_ref()))
let read_at = ObjectStoreReadAt::new(store.clone(), object.location.clone());
let vxf = VortexOpenOptions::new(self.context.clone())
.with_file_layout(
self.file_layout_cache
.try_get(object, store.clone())
.await?,
)
.open(read_at)
.await?;

// Evaluate the statistics for each column that we are able to return to DataFusion.
let field_paths = table_schema
.fields()
.iter()
.map(|f| FieldPath::from_name(f.name().to_owned()))
.collect();
let stats = vxf
.statistics(
field_paths,
[
Stat::Min,
Stat::Max,
Stat::NullCount,
Stat::UncompressedSizeInBytes,
]
.into(),
)?
.await?;

// Sum up the total byte size across all the columns.
let total_byte_size = Precision::Inexact(
stats
.iter()
.map(|s| {
s.get_as::<usize>(Stat::UncompressedSizeInBytes)
.unwrap_or_default()
})
.sum(),
);

let column_statistics = stats
.into_iter()
.map(|s| {
let null_count = s.get_as::<usize>(Stat::NullCount);
let min = s
.get(Stat::Min)
.cloned()
.and_then(|s| ScalarValue::try_from(s).ok());
let max = s
.get(Stat::Max)
.cloned()
.and_then(|s| ScalarValue::try_from(s).ok());
ColumnStatistics {
null_count: null_count
.map(Precision::Exact)
.unwrap_or(Precision::Absent),
max_value: max.map(Precision::Exact).unwrap_or(Precision::Absent),
min_value: min.map(Precision::Exact).unwrap_or(Precision::Absent),
distinct_count: Precision::Absent,
}
})
.collect::<Vec<_>>();

Ok(Statistics {
num_rows: Precision::Exact(
usize::try_from(vxf.row_count())
.map_err(|_| vortex_err!("Row count overflow"))
.vortex_expect("Row count overflow"),
),
total_byte_size,
column_statistics,
})
}

async fn create_physical_plan(
Expand Down
11 changes: 6 additions & 5 deletions vortex-datafusion/src/persistent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod cache;
pub mod config;
pub mod execution;
pub mod format;
pub mod opener;
pub mod statistics;
mod config;
mod execution;
mod format;
mod opener;

pub use format::{VortexFormat, VortexFormatOptions};
2 changes: 1 addition & 1 deletion vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use vortex_io::ObjectStoreReadAt;
use super::cache::FileLayoutCache;

#[derive(Clone)]
pub struct VortexFileOpener {
pub(crate) struct VortexFileOpener {
pub ctx: ContextRef,
pub object_store: Arc<dyn ObjectStore>,
pub projection: ExprRef,
Expand Down
57 changes: 0 additions & 57 deletions vortex-datafusion/src/persistent/statistics.rs

This file was deleted.

11 changes: 11 additions & 0 deletions vortex-dtype/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ impl Field {

/// A path through a (possibly nested) struct, composed of a sequence of field selectors
// TODO(ngates): wrap `Vec<Field>` in Option for cheaper "root" path.
// TODO(ngates): we should probably reverse the path. Or better yet, store a Arc<[Field]> along
// with a positional index to allow cheap step_into.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct FieldPath(Vec<Field>);
Expand Down Expand Up @@ -128,6 +130,15 @@ impl FieldPath {
pub fn push<F: Into<Field>>(&mut self, field: F) {
self.0.push(field.into());
}

/// Steps into the next field in the path
pub fn step_into(mut self) -> VortexResult<Self> {
if self.0.is_empty() {
return Err(vortex_err!("Cannot step into root path"));
}
self.0 = self.0.iter().skip(1).cloned().collect();
Ok(self)
}
}

impl FromIterator<Field> for FieldPath {
Expand Down
2 changes: 1 addition & 1 deletion vortex-file/src/v2/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vortex_error::VortexResult;
///
/// Note that the futures encapsulate heavy CPU code such as filtering and decompression. To
/// offload keep I/O work separate, please see the [`crate::v2::io::IoDriver`] trait.
pub trait ExecDriver {
pub trait ExecDriver: Send + Sync {
fn drive(
&self,
stream: BoxStream<'static, BoxFuture<'static, VortexResult<ArrayData>>>,
Expand Down
Loading

0 comments on commit b09bac8

Please sign in to comment.