Skip to content

Commit

Permalink
Remove ListingTable and FileScanConfig Unbounded (apache#8540) (apach…
Browse files Browse the repository at this point in the history
…e#8573)

* Remove ListingTable and FileScanConfig Unbounded (apache#8540)

* Fix substrait

* Fix logical conflicts

* Add deleted tests as ignored

---------

Co-authored-by: Mustafa Akur <[email protected]>
  • Loading branch information
tustvold and mustafasrepo authored Dec 18, 2023
1 parent a71a76a commit a1e959d
Show file tree
Hide file tree
Showing 32 changed files with 102 additions and 381 deletions.
1 change: 0 additions & 1 deletion datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ async fn main() -> Result<()> {
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
};

let result =
Expand Down
1 change: 0 additions & 1 deletion datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ async fn main() -> Result<()> {
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
};

let result =
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ pub(crate) mod test_util {
limit,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
)
Expand Down
48 changes: 11 additions & 37 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::Arc;

use arrow::datatypes::{DataType, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::{plan_err, DataFusionError};

use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand Down Expand Up @@ -72,8 +71,6 @@ pub struct CsvReadOptions<'a> {
pub table_partition_cols: Vec<(String, DataType)>,
/// File compression type
pub file_compression_type: FileCompressionType,
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
}
Expand All @@ -97,7 +94,6 @@ impl<'a> CsvReadOptions<'a> {
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
}
}
Expand All @@ -108,12 +104,6 @@ impl<'a> CsvReadOptions<'a> {
self
}

/// Configure mark_infinite setting
pub fn mark_infinite(mut self, infinite: bool) -> Self {
self.infinite = infinite;
self
}

/// Specify delimiter to use for CSV read
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
Expand Down Expand Up @@ -324,8 +314,6 @@ pub struct AvroReadOptions<'a> {
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<(String, DataType)>,
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
}

impl<'a> Default for AvroReadOptions<'a> {
Expand All @@ -334,7 +322,6 @@ impl<'a> Default for AvroReadOptions<'a> {
schema: None,
file_extension: DEFAULT_AVRO_EXTENSION,
table_partition_cols: vec![],
infinite: false,
}
}
}
Expand All @@ -349,12 +336,6 @@ impl<'a> AvroReadOptions<'a> {
self
}

/// Configure mark_infinite setting
pub fn mark_infinite(mut self, infinite: bool) -> Self {
self.infinite = infinite;
self
}

/// Specify schema to use for AVRO read
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
Expand Down Expand Up @@ -466,21 +447,17 @@ pub trait ReadOptions<'a> {
state: SessionState,
table_path: ListingTableUrl,
schema: Option<&'a Schema>,
infinite: bool,
) -> Result<SchemaRef>
where
'a: 'async_trait,
{
match (schema, infinite) {
(Some(s), _) => Ok(Arc::new(s.to_owned())),
(None, false) => Ok(self
.to_listing_options(config)
.infer_schema(&state, &table_path)
.await?),
(None, true) => {
plan_err!("Schema inference for infinite data sources is not supported.")
}
if let Some(s) = schema {
return Ok(Arc::new(s.to_owned()));
}

self.to_listing_options(config)
.infer_schema(&state, &table_path)
.await
}
}

Expand All @@ -500,7 +477,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_infinite_source(self.infinite)
}

async fn get_resolved_schema(
Expand All @@ -509,7 +485,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Expand All @@ -535,7 +511,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, false)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Expand All @@ -551,7 +527,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_infinite_source(self.infinite)
.with_file_sort_order(self.file_sort_order.clone())
}

Expand All @@ -561,7 +536,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Expand All @@ -575,7 +550,6 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_infinite_source(self.infinite)
}

async fn get_resolved_schema(
Expand All @@ -584,7 +558,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Expand All @@ -606,7 +580,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, false)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Loading

0 comments on commit a1e959d

Please sign in to comment.