Skip to content

Commit

Permalink
Allow DynamicFileCatalog support to query partitioned file (apache#12683
Browse files Browse the repository at this point in the history
)

* support to query partitioned table for dynamic file catalog

* cargo clippy

* split partitions inferring to another function
  • Loading branch information
goldmedal authored Oct 2, 2024
1 parent 5a318cd commit 66aead7
Show file tree
Hide file tree
Showing 3 changed files with 208 additions and 8 deletions.
13 changes: 10 additions & 3 deletions datafusion/core/src/datasource/dynamic_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,18 @@ impl UrlTableFactory for DynamicListTableFactory {
.ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;

match ListingTableConfig::new(table_url.clone())
.infer(state)
.infer_options(state)
.await
{
Ok(cfg) => ListingTable::try_new(cfg)
.map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>)),
Ok(cfg) => {
let cfg = cfg
.infer_partitions_from_path(state)
.await?
.infer_schema(state)
.await?;
ListingTable::try_new(cfg)
.map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
}
Err(_) => Ok(None),
}
}
Expand Down
36 changes: 34 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::datasource::{
};
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -192,6 +192,38 @@ impl ListingTableConfig {
pub async fn infer(self, state: &SessionState) -> Result<Self> {
self.infer_options(state).await?.infer_schema(state).await
}

/// Infer the partition columns from the path. Requires `self.options` to be set prior to using.
pub async fn infer_partitions_from_path(self, state: &SessionState) -> Result<Self> {
match self.options {
Some(options) => {
let Some(url) = self.table_paths.first() else {
return config_err!("No table path found");
};
let partitions = options
.infer_partitions(state, url)
.await?
.into_iter()
.map(|col_name| {
(
col_name,
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
)
})
.collect::<Vec<_>>();
let options = options.with_table_partition_cols(partitions);
Ok(Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(options),
})
}
None => config_err!("No `ListingOptions` set for inferring schema"),
}
}
}

/// Options for creating a [`ListingTable`]
Expand Down Expand Up @@ -505,7 +537,7 @@ impl ListingOptions {
/// Infer the partitioning at the given path on the provided object store.
/// For performance reasons, it doesn't read all the files on disk
/// and therefore may fail to detect invalid partitioning.
async fn infer_partitions(
pub(crate) async fn infer_partitions(
&self,
state: &SessionState,
table_path: &ListingTableUrl,
Expand Down
167 changes: 164 additions & 3 deletions datafusion/sqllogictest/test_files/dynamic_file.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,170 @@ SELECT * FROM '../core/tests/data/partitioned_table_arrow/part=123' ORDER BY f0;
1 foo true
2 bar false

# dynamic file query doesn't support partitioned table
statement error DataFusion error: Error during planning: table 'datafusion.public.../core/tests/data/partitioned_table_arrow' not found
SELECT * FROM '../core/tests/data/partitioned_table_arrow' ORDER BY f0;
# Read partitioned file
statement ok
CREATE TABLE src_table_1 (
int_col INT,
string_col TEXT,
bigint_col BIGINT,
partition_col INT
) AS VALUES
(1, 'aaa', 100, 1),
(2, 'bbb', 200, 1),
(3, 'ccc', 300, 1),
(4, 'ddd', 400, 1);

statement ok
CREATE TABLE src_table_2 (
int_col INT,
string_col TEXT,
bigint_col BIGINT,
partition_col INT
) AS VALUES
(5, 'eee', 500, 2),
(6, 'fff', 600, 2),
(7, 'ggg', 700, 2),
(8, 'hhh', 800, 2);

# Read partitioned csv file

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/csv_partitions'
STORED AS CSV
PARTITIONED BY (partition_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/csv_partitions'
STORED AS CSV
PARTITIONED BY (partition_col);
----
4

query ITIT rowsort
SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/csv_partitions';
----
1 aaa 100 1
2 bbb 200 1
3 ccc 300 1
4 ddd 400 1
5 eee 500 2
6 fff 600 2
7 ggg 700 2
8 hhh 800 2

# Read partitioned json file

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/json_partitions'
STORED AS JSON
PARTITIONED BY (partition_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/json_partitions'
STORED AS JSON
PARTITIONED BY (partition_col);
----
4

query ITIT rowsort
SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/json_partitions';
----
1 aaa 100 1
2 bbb 200 1
3 ccc 300 1
4 ddd 400 1
5 eee 500 2
6 fff 600 2
7 ggg 700 2
8 hhh 800 2

# Read partitioned arrow file

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/arrow_partitions'
STORED AS ARROW
PARTITIONED BY (partition_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/arrow_partitions'
STORED AS ARROW
PARTITIONED BY (partition_col);
----
4

query ITIT rowsort
SELECT int_col, string_col, bigint_col, partition_col FROM 'test_files/scratch/dynamic_file/arrow_partitions';
----
1 aaa 100 1
2 bbb 200 1
3 ccc 300 1
4 ddd 400 1
5 eee 500 2
6 fff 600 2
7 ggg 700 2
8 hhh 800 2

# Read partitioned parquet file

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/parquet_partitions'
STORED AS PARQUET
PARTITIONED BY (partition_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/parquet_partitions'
STORED AS PARQUET
PARTITIONED BY (partition_col);
----
4

query ITIT rowsort
select * from 'test_files/scratch/dynamic_file/parquet_partitions';
----
1 aaa 100 1
2 bbb 200 1
3 ccc 300 1
4 ddd 400 1
5 eee 500 2
6 fff 600 2
7 ggg 700 2
8 hhh 800 2

# Read partitioned parquet file with multiple partition columns

query I
COPY src_table_1 TO 'test_files/scratch/dynamic_file/nested_partition'
STORED AS PARQUET
PARTITIONED BY (partition_col, string_col);
----
4

query I
COPY src_table_2 TO 'test_files/scratch/dynamic_file/nested_partition'
STORED AS PARQUET
PARTITIONED BY (partition_col, string_col);
----
4

query IITT rowsort
select * from 'test_files/scratch/dynamic_file/nested_partition';
----
1 100 1 aaa
2 200 1 bbb
3 300 1 ccc
4 400 1 ddd
5 500 2 eee
6 600 2 fff
7 700 2 ggg
8 800 2 hhh

# read avro file
query IT
Expand Down

0 comments on commit 66aead7

Please sign in to comment.