diff --git a/Cargo.lock b/Cargo.lock index 0a7407b50398..a03b295dabfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1733,6 +1733,7 @@ dependencies = [ "datafusion-catalog-listing", "datafusion-common", "datafusion-common-runtime", + "datafusion-datasource", "datafusion-doc", "datafusion-execution", "datafusion-expr", @@ -1823,32 +1824,20 @@ name = "datafusion-catalog-listing" version = "45.0.0" dependencies = [ "arrow", - "async-compression", "async-trait", - "bytes", - "bzip2 0.5.1", - "chrono", "datafusion-catalog", "datafusion-common", - "datafusion-common-runtime", + "datafusion-datasource", "datafusion-execution", "datafusion-expr", "datafusion-physical-expr", "datafusion-physical-expr-common", "datafusion-physical-plan", - "flate2", "futures", - "glob", - "itertools 0.14.0", "log", "object_store", - "rand 0.8.5", "tempfile", "tokio", - "tokio-util", - "url", - "xz2", - "zstd", ] [[package]] @@ -1912,6 +1901,37 @@ dependencies = [ "tokio", ] +[[package]] +name = "datafusion-datasource" +version = "45.0.0" +dependencies = [ + "arrow", + "async-compression", + "async-trait", + "bytes", + "bzip2 0.5.1", + "chrono", + "datafusion-catalog", + "datafusion-common", + "datafusion-common-runtime", + "datafusion-execution", + "datafusion-expr", + "datafusion-physical-plan", + "flate2", + "futures", + "glob", + "itertools 0.14.0", + "log", + "object_store", + "rand 0.8.5", + "tempfile", + "tokio", + "tokio-util", + "url", + "xz2", + "zstd", +] + [[package]] name = "datafusion-doc" version = "45.0.0" diff --git a/Cargo.toml b/Cargo.toml index 1e35b7f42027..099e5f22972c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,9 +99,10 @@ ctor = "0.2.9" dashmap = "6.0.1" datafusion = { path = "datafusion/core", version = "45.0.0", default-features = false } datafusion-catalog = { path = "datafusion/catalog", version = "45.0.0" } -datafusion-catalog-listing = { path = "datafusion/catalog-listing", version = "45.0.0", default-features = false } +datafusion-catalog-listing = { path = "datafusion/catalog-listing", version = "45.0.0" } datafusion-common = { path = "datafusion/common", version = "45.0.0", default-features = false } datafusion-common-runtime = { path = "datafusion/common-runtime", version = "45.0.0" } +datafusion-datasource = { path = "datafusion/datasource", version = "45.0.0", default-features = false } datafusion-doc = { path = "datafusion/doc", version = "45.0.0" } datafusion-execution = { path = "datafusion/execution", version = "45.0.0" } datafusion-expr = { path = "datafusion/expr", version = "45.0.0" } diff --git a/datafusion/catalog-listing/Cargo.toml b/datafusion/catalog-listing/Cargo.toml index 0aa2083ebca9..68d0ca3a149f 100644 --- a/datafusion/catalog-listing/Cargo.toml +++ b/datafusion/catalog-listing/Cargo.toml @@ -27,43 +27,21 @@ repository.workspace = true rust-version.workspace = true version.workspace = true -[features] -compression = ["async-compression", "xz2", "bzip2", "flate2", "zstd", "tokio-util"] -default = ["compression"] - [dependencies] arrow = { workspace = true } -async-compression = { version = "0.4.0", features = [ - "bzip2", - "gzip", - "xz", - "zstd", - "tokio", -], optional = true } async-trait = { workspace = true } -bytes = { workspace = true } -bzip2 = { version = "0.5.1", optional = true } -chrono = { workspace = true } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } -datafusion-common-runtime = { workspace = true } +datafusion-datasource = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } -flate2 = { version = "1.0.24", optional = true } futures = { workspace = true } -glob = "0.3.0" -itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } -rand = { workspace = true } tokio = { workspace = true } -tokio-util = { version = "0.7.4", features = ["io"], optional = true } -url = { workspace = true } -xz2 = { version = "0.1", optional = true, features = ["static"] } -zstd = { version = "0.13", optional = true, default-features = false } [dev-dependencies] tempfile = { workspace = true } diff --git a/datafusion/catalog-listing/src/helpers.rs b/datafusion/catalog-listing/src/helpers.rs index ceacde2494e2..cf475263535a 100644 --- a/datafusion/catalog-listing/src/helpers.rs +++ b/datafusion/catalog-listing/src/helpers.rs @@ -20,11 +20,11 @@ use std::mem; use std::sync::Arc; -use super::ListingTableUrl; -use super::PartitionedFile; use datafusion_catalog::Session; use datafusion_common::internal_err; use datafusion_common::{HashMap, Result, ScalarValue}; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource::PartitionedFile; use datafusion_expr::{BinaryExpr, Operator}; use arrow::{ diff --git a/datafusion/catalog-listing/src/mod.rs b/datafusion/catalog-listing/src/mod.rs index 9eb79ec07ac8..b98790e86455 100644 --- a/datafusion/catalog-listing/src/mod.rs +++ b/datafusion/catalog-listing/src/mod.rs @@ -15,270 +15,4 @@ // specific language governing permissions and limitations // under the License. -//! A table that uses the `ObjectStore` listing capability -//! to get the list of files to process. - -pub mod file_compression_type; -pub mod file_groups; -pub mod file_meta; -pub mod file_scan_config; -pub mod file_sink_config; -pub mod file_stream; pub mod helpers; -pub mod url; -pub mod write; -use chrono::TimeZone; -use datafusion_common::Result; -use datafusion_common::{ScalarValue, Statistics}; -use futures::Stream; -use object_store::{path::Path, ObjectMeta}; -use std::pin::Pin; -use std::sync::Arc; - -pub use self::url::ListingTableUrl; - -/// Stream of files get listed from object store -pub type PartitionedFileStream = - Pin> + Send + Sync + 'static>>; - -/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" -/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping -/// sections of a Parquet file in parallel. -#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)] -pub struct FileRange { - /// Range start - pub start: i64, - /// Range end - pub end: i64, -} - -impl FileRange { - /// returns true if this file range contains the specified offset - pub fn contains(&self, offset: i64) -> bool { - offset >= self.start && offset < self.end - } -} - -#[derive(Debug, Clone)] -/// A single file or part of a file that should be read, along with its schema, statistics -/// and partition column values that need to be appended to each row. -pub struct PartitionedFile { - /// Path for the file (e.g. URL, filesystem path, etc) - pub object_meta: ObjectMeta, - /// Values of partition columns to be appended to each row. - /// - /// These MUST have the same count, order, and type than the [`table_partition_cols`]. - /// - /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. - /// - /// - /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55 - /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62 - /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190 - pub partition_values: Vec, - /// An optional file range for a more fine-grained parallel execution - pub range: Option, - /// Optional statistics that describe the data in this file if known. - /// - /// DataFusion relies on these statistics for planning (in particular to sort file groups), - /// so if they are incorrect, incorrect answers may result. - pub statistics: Option, - /// An optional field for user defined per object metadata - pub extensions: Option>, - /// The estimated size of the parquet metadata, in bytes - pub metadata_size_hint: Option, -} - -impl PartitionedFile { - /// Create a simple file without metadata or partition - pub fn new(path: impl Into, size: u64) -> Self { - Self { - object_meta: ObjectMeta { - location: Path::from(path.into()), - last_modified: chrono::Utc.timestamp_nanos(0), - size: size as usize, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - } - } - - /// Create a file range without metadata or partition - pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { - Self { - object_meta: ObjectMeta { - location: Path::from(path), - last_modified: chrono::Utc.timestamp_nanos(0), - size: size as usize, - e_tag: None, - version: None, - }, - partition_values: vec![], - range: Some(FileRange { start, end }), - statistics: None, - extensions: None, - metadata_size_hint: None, - } - .with_range(start, end) - } - - /// Provide a hint to the size of the file metadata. If a hint is provided - /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. - /// Without an appropriate hint, two read may be required to fetch the metadata. - pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { - self.metadata_size_hint = Some(metadata_size_hint); - self - } - - /// Return a file reference from the given path - pub fn from_path(path: String) -> Result { - let size = std::fs::metadata(path.clone())?.len(); - Ok(Self::new(path, size)) - } - - /// Return the path of this partitioned file - pub fn path(&self) -> &Path { - &self.object_meta.location - } - - /// Update the file to only scan the specified range (in bytes) - pub fn with_range(mut self, start: i64, end: i64) -> Self { - self.range = Some(FileRange { start, end }); - self - } - - /// Update the user defined extensions for this file. - /// - /// This can be used to pass reader specific information. - pub fn with_extensions( - mut self, - extensions: Arc, - ) -> Self { - self.extensions = Some(extensions); - self - } -} - -impl From for PartitionedFile { - fn from(object_meta: ObjectMeta) -> Self { - PartitionedFile { - object_meta, - partition_values: vec![], - range: None, - statistics: None, - extensions: None, - metadata_size_hint: None, - } - } -} - -#[cfg(test)] -mod tests { - use super::ListingTableUrl; - use datafusion_execution::object_store::{ - DefaultObjectStoreRegistry, ObjectStoreRegistry, - }; - use object_store::{local::LocalFileSystem, path::Path}; - use std::{ops::Not, sync::Arc}; - use url::Url; - - #[test] - fn test_object_store_listing_url() { - let listing = ListingTableUrl::parse("file:///").unwrap(); - let store = listing.object_store(); - assert_eq!(store.as_str(), "file:///"); - - let listing = ListingTableUrl::parse("s3://bucket/").unwrap(); - let store = listing.object_store(); - assert_eq!(store.as_str(), "s3://bucket/"); - } - - #[test] - fn test_get_store_hdfs() { - let sut = DefaultObjectStoreRegistry::default(); - let url = Url::parse("hdfs://localhost:8020").unwrap(); - sut.register_store(&url, Arc::new(LocalFileSystem::new())); - let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_s3() { - let sut = DefaultObjectStoreRegistry::default(); - let url = Url::parse("s3://bucket/key").unwrap(); - sut.register_store(&url, Arc::new(LocalFileSystem::new())); - let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_file() { - let sut = DefaultObjectStoreRegistry::default(); - let url = ListingTableUrl::parse("file:///bucket/key").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_get_store_local() { - let sut = DefaultObjectStoreRegistry::default(); - let url = ListingTableUrl::parse("../").unwrap(); - sut.get_store(url.as_ref()).unwrap(); - } - - #[test] - fn test_url_contains() { - let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap(); - - // standard case with default config - assert!(url.contains( - &Path::parse("/var/data/mytable/data.parquet").unwrap(), - true - )); - - // standard case with `ignore_subdirectory` set to false - assert!(url.contains( - &Path::parse("/var/data/mytable/data.parquet").unwrap(), - false - )); - - // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't - // a direct child of the `url` - assert!(url - .contains( - &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), - true - ) - .not()); - - // when we set `ignore_subdirectory` to false, we should not ignore the file - assert!(url.contains( - &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), - false - )); - - // as above, `ignore_subdirectory` is false, so we include the file - assert!(url.contains( - &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), - false - )); - - // in this case, we include the file even when `ignore_subdirectory` is true because the - // path segment is a hive partition which doesn't count as a subdirectory for the purposes - // of `Url::contains` - assert!(url.contains( - &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), - true - )); - - // testing an empty path with default config - assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true)); - - // testing an empty path with `ignore_subdirectory` set to false - assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false)); - } -} diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 6492e828e60c..ead9c90b7efb 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -43,7 +43,7 @@ array_expressions = ["nested_expressions"] # Used to enable the avro format avro = ["apache-avro", "num-traits", "datafusion-common/avro"] backtrace = ["datafusion-common/backtrace"] -compression = ["xz2", "bzip2", "flate2", "zstd", "datafusion-catalog-listing/compression"] +compression = ["xz2", "bzip2", "flate2", "zstd", "datafusion-datasource/compression"] crypto_expressions = ["datafusion-functions/crypto_expressions"] datetime_expressions = ["datafusion-functions/datetime_expressions"] default = [ @@ -95,6 +95,7 @@ datafusion-catalog = { workspace = true } datafusion-catalog-listing = { workspace = true } datafusion-common = { workspace = true, features = ["object_store"] } datafusion-common-runtime = { workspace = true } +datafusion-datasource = { workspace = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } datafusion-functions = { workspace = true } diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index dd48a9537187..657fe6ca5511 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -28,8 +28,8 @@ pub mod json; pub mod options; #[cfg(feature = "parquet")] pub mod parquet; -pub use datafusion_catalog_listing::file_compression_type; -pub use datafusion_catalog_listing::write; +pub use datafusion_datasource::file_compression_type; +pub use datafusion_datasource::write; use std::any::Any; use std::collections::{HashMap, VecDeque}; diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 39323b993d45..a58db55bccb6 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -19,5 +19,8 @@ //! to get the list of files to process. mod table; -pub use datafusion_catalog_listing::*; +pub use datafusion_catalog_listing::helpers; +pub use datafusion_datasource::{ + FileRange, ListingTableUrl, PartitionedFile, PartitionedFileStream, +}; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 3708fe6abd5e..e979eb49d0f6 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -35,7 +35,7 @@ use datafusion_common::{ColumnStatistics, Constraints, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, Partitioning}; use crate::datasource::data_source::FileSource; -pub use datafusion_catalog_listing::file_scan_config::*; +pub use datafusion_datasource::file_scan_config::*; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_physical_plan::display::{display_orderings, ProjectSchemaDisplay}; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index c88d4c4458a5..7944d6fa9020 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -37,11 +37,9 @@ use crate::physical_plan::RecordBatchStream; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -pub use datafusion_catalog_listing::file_stream::{FileOpenFuture, FileOpener, OnError}; -use datafusion_catalog_listing::file_stream::{ - FileStreamMetrics, FileStreamState, NextOpen, -}; use datafusion_common::ScalarValue; +pub use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener, OnError}; +use datafusion_datasource::file_stream::{FileStreamMetrics, FileStreamState, NextOpen}; use futures::{ready, FutureExt, Stream, StreamExt}; diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs index 18174bd54e4f..953c99322e16 100644 --- a/datafusion/core/src/datasource/physical_plan/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/mod.rs @@ -50,9 +50,9 @@ pub use avro::AvroSource; #[allow(deprecated)] pub use csv::{CsvExec, CsvExecBuilder}; pub use csv::{CsvOpener, CsvSource}; -pub use datafusion_catalog_listing::file_groups::FileGroupPartitioner; -pub use datafusion_catalog_listing::file_meta::FileMeta; -pub use datafusion_catalog_listing::file_sink_config::*; +pub use datafusion_datasource::file_groups::FileGroupPartitioner; +pub use datafusion_datasource::file_meta::FileMeta; +pub use datafusion_datasource::file_sink_config::*; pub use file_scan_config::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; diff --git a/datafusion/datasource/Cargo.toml b/datafusion/datasource/Cargo.toml new file mode 100644 index 000000000000..caf1c60a785d --- /dev/null +++ b/datafusion/datasource/Cargo.toml @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-datasource" +description = "datafusion-datasource" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[features] +compression = ["async-compression", "xz2", "bzip2", "flate2", "zstd", "tokio-util"] +default = ["compression"] + +[dependencies] +arrow = { workspace = true } +async-compression = { version = "0.4.0", features = [ + "bzip2", + "gzip", + "xz", + "zstd", + "tokio", +], optional = true } +async-trait = { workspace = true } +bytes = { workspace = true } +bzip2 = { version = "0.5.1", optional = true } +chrono = { workspace = true } +datafusion-catalog = { workspace = true } +datafusion-common = { workspace = true, features = ["object_store"] } +datafusion-common-runtime = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +datafusion-physical-plan = { workspace = true } +flate2 = { version = "1.0.24", optional = true } +futures = { workspace = true } +glob = "0.3.0" +itertools = { workspace = true } +log = { workspace = true } +object_store = { workspace = true } +rand = { workspace = true } +tokio = { workspace = true } +tokio-util = { version = "0.7.4", features = ["io"], optional = true } +url = { workspace = true } +xz2 = { version = "0.1", optional = true, features = ["static"] } +zstd = { version = "0.13", optional = true, default-features = false } + +[dev-dependencies] +tempfile = { workspace = true } + +[lints] +workspace = true + +[lib] +name = "datafusion_datasource" +path = "src/mod.rs" diff --git a/datafusion/datasource/LICENSE.txt b/datafusion/datasource/LICENSE.txt new file mode 120000 index 000000000000..1ef648f64b34 --- /dev/null +++ b/datafusion/datasource/LICENSE.txt @@ -0,0 +1 @@ +../../LICENSE.txt \ No newline at end of file diff --git a/datafusion/datasource/NOTICE.txt b/datafusion/datasource/NOTICE.txt new file mode 120000 index 000000000000..fb051c92b10b --- /dev/null +++ b/datafusion/datasource/NOTICE.txt @@ -0,0 +1 @@ +../../NOTICE.txt \ No newline at end of file diff --git a/datafusion/datasource/README.md b/datafusion/datasource/README.md new file mode 100644 index 000000000000..2479a28ae68d --- /dev/null +++ b/datafusion/datasource/README.md @@ -0,0 +1,24 @@ + + +# DataFusion datasource + +[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. + +This crate is a submodule of DataFusion that defines common DataSource related components like FileScanConfig, FileCompression etc. diff --git a/datafusion/catalog-listing/src/file_compression_type.rs b/datafusion/datasource/src/file_compression_type.rs similarity index 100% rename from datafusion/catalog-listing/src/file_compression_type.rs rename to datafusion/datasource/src/file_compression_type.rs diff --git a/datafusion/catalog-listing/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs similarity index 100% rename from datafusion/catalog-listing/src/file_groups.rs rename to datafusion/datasource/src/file_groups.rs diff --git a/datafusion/catalog-listing/src/file_meta.rs b/datafusion/datasource/src/file_meta.rs similarity index 100% rename from datafusion/catalog-listing/src/file_meta.rs rename to datafusion/datasource/src/file_meta.rs diff --git a/datafusion/catalog-listing/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs similarity index 100% rename from datafusion/catalog-listing/src/file_scan_config.rs rename to datafusion/datasource/src/file_scan_config.rs diff --git a/datafusion/catalog-listing/src/file_sink_config.rs b/datafusion/datasource/src/file_sink_config.rs similarity index 100% rename from datafusion/catalog-listing/src/file_sink_config.rs rename to datafusion/datasource/src/file_sink_config.rs diff --git a/datafusion/catalog-listing/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs similarity index 100% rename from datafusion/catalog-listing/src/file_stream.rs rename to datafusion/datasource/src/file_stream.rs diff --git a/datafusion/datasource/src/mod.rs b/datafusion/datasource/src/mod.rs new file mode 100644 index 000000000000..c735c3108b3d --- /dev/null +++ b/datafusion/datasource/src/mod.rs @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A table that uses the `ObjectStore` listing capability +//! to get the list of files to process. + +pub mod file_compression_type; +pub mod file_groups; +pub mod file_meta; +pub mod file_scan_config; +pub mod file_sink_config; +pub mod file_stream; +pub mod url; +pub mod write; +use chrono::TimeZone; +use datafusion_common::Result; +use datafusion_common::{ScalarValue, Statistics}; +use futures::Stream; +use object_store::{path::Path, ObjectMeta}; +use std::pin::Pin; +use std::sync::Arc; + +pub use self::url::ListingTableUrl; + +/// Stream of files get listed from object store +pub type PartitionedFileStream = + Pin> + Send + Sync + 'static>>; + +/// Only scan a subset of Row Groups from the Parquet file whose data "midpoint" +/// lies within the [start, end) byte offsets. This option can be used to scan non-overlapping +/// sections of a Parquet file in parallel. +#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)] +pub struct FileRange { + /// Range start + pub start: i64, + /// Range end + pub end: i64, +} + +impl FileRange { + /// returns true if this file range contains the specified offset + pub fn contains(&self, offset: i64) -> bool { + offset >= self.start && offset < self.end + } +} + +#[derive(Debug, Clone)] +/// A single file or part of a file that should be read, along with its schema, statistics +/// and partition column values that need to be appended to each row. +pub struct PartitionedFile { + /// Path for the file (e.g. URL, filesystem path, etc) + pub object_meta: ObjectMeta, + /// Values of partition columns to be appended to each row. + /// + /// These MUST have the same count, order, and type than the [`table_partition_cols`]. + /// + /// You may use [`wrap_partition_value_in_dict`] to wrap them if you have used [`wrap_partition_type_in_dict`] to wrap the column type. + /// + /// + /// [`wrap_partition_type_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L55 + /// [`wrap_partition_value_in_dict`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_scan_config.rs#L62 + /// [`table_partition_cols`]: https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/file_format/options.rs#L190 + pub partition_values: Vec, + /// An optional file range for a more fine-grained parallel execution + pub range: Option, + /// Optional statistics that describe the data in this file if known. + /// + /// DataFusion relies on these statistics for planning (in particular to sort file groups), + /// so if they are incorrect, incorrect answers may result. + pub statistics: Option, + /// An optional field for user defined per object metadata + pub extensions: Option>, + /// The estimated size of the parquet metadata, in bytes + pub metadata_size_hint: Option, +} + +impl PartitionedFile { + /// Create a simple file without metadata or partition + pub fn new(path: impl Into, size: u64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path.into()), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + metadata_size_hint: None, + } + } + + /// Create a file range without metadata or partition + pub fn new_with_range(path: String, size: u64, start: i64, end: i64) -> Self { + Self { + object_meta: ObjectMeta { + location: Path::from(path), + last_modified: chrono::Utc.timestamp_nanos(0), + size: size as usize, + e_tag: None, + version: None, + }, + partition_values: vec![], + range: Some(FileRange { start, end }), + statistics: None, + extensions: None, + metadata_size_hint: None, + } + .with_range(start, end) + } + + /// Provide a hint to the size of the file metadata. If a hint is provided + /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. + /// Without an appropriate hint, two read may be required to fetch the metadata. + pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { + self.metadata_size_hint = Some(metadata_size_hint); + self + } + + /// Return a file reference from the given path + pub fn from_path(path: String) -> Result { + let size = std::fs::metadata(path.clone())?.len(); + Ok(Self::new(path, size)) + } + + /// Return the path of this partitioned file + pub fn path(&self) -> &Path { + &self.object_meta.location + } + + /// Update the file to only scan the specified range (in bytes) + pub fn with_range(mut self, start: i64, end: i64) -> Self { + self.range = Some(FileRange { start, end }); + self + } + + /// Update the user defined extensions for this file. + /// + /// This can be used to pass reader specific information. + pub fn with_extensions( + mut self, + extensions: Arc, + ) -> Self { + self.extensions = Some(extensions); + self + } +} + +impl From for PartitionedFile { + fn from(object_meta: ObjectMeta) -> Self { + PartitionedFile { + object_meta, + partition_values: vec![], + range: None, + statistics: None, + extensions: None, + metadata_size_hint: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::ListingTableUrl; + use datafusion_execution::object_store::{ + DefaultObjectStoreRegistry, ObjectStoreRegistry, + }; + use object_store::{local::LocalFileSystem, path::Path}; + use std::{ops::Not, sync::Arc}; + use url::Url; + + #[test] + fn test_object_store_listing_url() { + let listing = ListingTableUrl::parse("file:///").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "file:///"); + + let listing = ListingTableUrl::parse("s3://bucket/").unwrap(); + let store = listing.object_store(); + assert_eq!(store.as_str(), "s3://bucket/"); + } + + #[test] + fn test_get_store_hdfs() { + let sut = DefaultObjectStoreRegistry::default(); + let url = Url::parse("hdfs://localhost:8020").unwrap(); + sut.register_store(&url, Arc::new(LocalFileSystem::new())); + let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_s3() { + let sut = DefaultObjectStoreRegistry::default(); + let url = Url::parse("s3://bucket/key").unwrap(); + sut.register_store(&url, Arc::new(LocalFileSystem::new())); + let url = ListingTableUrl::parse("s3://bucket/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_file() { + let sut = DefaultObjectStoreRegistry::default(); + let url = ListingTableUrl::parse("file:///bucket/key").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_get_store_local() { + let sut = DefaultObjectStoreRegistry::default(); + let url = ListingTableUrl::parse("../").unwrap(); + sut.get_store(url.as_ref()).unwrap(); + } + + #[test] + fn test_url_contains() { + let url = ListingTableUrl::parse("file:///var/data/mytable/").unwrap(); + + // standard case with default config + assert!(url.contains( + &Path::parse("/var/data/mytable/data.parquet").unwrap(), + true + )); + + // standard case with `ignore_subdirectory` set to false + assert!(url.contains( + &Path::parse("/var/data/mytable/data.parquet").unwrap(), + false + )); + + // as per documentation, when `ignore_subdirectory` is true, we should ignore files that aren't + // a direct child of the `url` + assert!(url + .contains( + &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), + true + ) + .not()); + + // when we set `ignore_subdirectory` to false, we should not ignore the file + assert!(url.contains( + &Path::parse("/var/data/mytable/mysubfolder/data.parquet").unwrap(), + false + )); + + // as above, `ignore_subdirectory` is false, so we include the file + assert!(url.contains( + &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), + false + )); + + // in this case, we include the file even when `ignore_subdirectory` is true because the + // path segment is a hive partition which doesn't count as a subdirectory for the purposes + // of `Url::contains` + assert!(url.contains( + &Path::parse("/var/data/mytable/year=2024/data.parquet").unwrap(), + true + )); + + // testing an empty path with default config + assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), true)); + + // testing an empty path with `ignore_subdirectory` set to false + assert!(url.contains(&Path::parse("/var/data/mytable/").unwrap(), false)); + } +} diff --git a/datafusion/catalog-listing/src/url.rs b/datafusion/datasource/src/url.rs similarity index 99% rename from datafusion/catalog-listing/src/url.rs rename to datafusion/datasource/src/url.rs index 2e6415ba3b2b..89e73a8a2b26 100644 --- a/datafusion/catalog-listing/src/url.rs +++ b/datafusion/datasource/src/url.rs @@ -193,7 +193,7 @@ impl ListingTableUrl { /// /// Examples: /// ```rust - /// use datafusion_catalog_listing::ListingTableUrl; + /// use datafusion_datasource::ListingTableUrl; /// let url = ListingTableUrl::parse("file:///foo/bar.csv").unwrap(); /// assert_eq!(url.file_extension(), Some("csv")); /// let url = ListingTableUrl::parse("file:///foo/bar").unwrap(); diff --git a/datafusion/catalog-listing/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs similarity index 100% rename from datafusion/catalog-listing/src/write/demux.rs rename to datafusion/datasource/src/write/demux.rs diff --git a/datafusion/catalog-listing/src/write/mod.rs b/datafusion/datasource/src/write/mod.rs similarity index 100% rename from datafusion/catalog-listing/src/write/mod.rs rename to datafusion/datasource/src/write/mod.rs diff --git a/datafusion/catalog-listing/src/write/orchestration.rs b/datafusion/datasource/src/write/orchestration.rs similarity index 100% rename from datafusion/catalog-listing/src/write/orchestration.rs rename to datafusion/datasource/src/write/orchestration.rs