Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/move_map_test
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 20, 2023
2 parents b0c9427 + b925b78 commit 788736a
Show file tree
Hide file tree
Showing 78 changed files with 4,780 additions and 1,986 deletions.
1 change: 0 additions & 1 deletion datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ async fn main() -> Result<()> {
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
};

let result =
Expand Down
1 change: 0 additions & 1 deletion datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ async fn main() -> Result<()> {
limit: Some(5),
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
};

let result =
Expand Down
30 changes: 15 additions & 15 deletions datafusion-examples/examples/simple_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,23 @@ use datafusion::{error::Result, physical_plan::functions::make_scalar_function};
use datafusion_common::cast::as_float64_array;
use std::sync::Arc;

// create local execution context with an in-memory table
/// create local execution context with an in-memory table:
///
/// ```text
/// +-----+-----+
/// | a | b |
/// +-----+-----+
/// | 2.1 | 1.0 |
/// | 3.1 | 2.0 |
/// | 4.1 | 3.0 |
/// | 5.1 | 4.0 |
/// +-----+-----+
/// ```
fn create_context() -> Result<SessionContext> {
use datafusion::arrow::datatypes::{Field, Schema};
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float32, false),
Field::new("b", DataType::Float64, false),
]));

// define data.
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1])),
Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])),
],
)?;
let a: ArrayRef = Arc::new(Float32Array::from(vec![2.1, 3.1, 4.1, 5.1]));
let b: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0]));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;

// declare a new context. In spark API, this corresponds to a new spark SQLsession
let ctx = SessionContext::new();
Expand Down
1 change: 1 addition & 0 deletions datafusion-examples/examples/simple_udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl TableProvider for LocalCsvTable {
)?))
}
}

struct LocalCsvTableFunc {}

impl TableFunctionImpl for LocalCsvTableFunc {
Expand Down
16 changes: 16 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,22 @@ impl DFSchema {
.collect()
}

/// Find all fields indices having the given qualifier
pub fn fields_indices_with_qualified(
&self,
qualifier: &TableReference,
) -> Vec<usize> {
self.fields
.iter()
.enumerate()
.filter_map(|(idx, field)| {
field
.qualifier()
.and_then(|q| q.eq(qualifier).then_some(idx))
})
.collect()
}

/// Find all fields match the given name
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&DFField> {
self.fields
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/file_options/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ impl FromStr for FileType {
}

#[cfg(test)]
#[cfg(feature = "parquet")]
mod tests {
use crate::error::DataFusionError;
use crate::file_options::FileType;
use std::str::FromStr;

#[test]
#[cfg(feature = "parquet")]
fn from_str() {
for (ext, file_type) in [
("csv", FileType::CSV),
Expand Down
7 changes: 1 addition & 6 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,10 @@ impl Display for FileTypeWriterOptions {
}

#[cfg(test)]
#[cfg(feature = "parquet")]
mod tests {
use std::collections::HashMap;

#[cfg(feature = "parquet")]
use parquet::{
basic::{Compression, Encoding, ZstdLevel},
file::properties::{EnabledStatistics, WriterVersion},
Expand All @@ -314,11 +314,9 @@ mod tests {

use crate::Result;

#[cfg(feature = "parquet")]
use super::{parquet_writer::ParquetWriterOptions, StatementOptions};

#[test]
#[cfg(feature = "parquet")]
fn test_writeroptions_parquet_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("max_row_group_size".to_owned(), "123".to_owned());
Expand Down Expand Up @@ -389,7 +387,6 @@ mod tests {
}

#[test]
#[cfg(feature = "parquet")]
fn test_writeroptions_parquet_column_specific() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();

Expand Down Expand Up @@ -511,7 +508,6 @@ mod tests {

#[test]
// for StatementOptions
#[cfg(feature = "parquet")]
fn test_writeroptions_csv_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("header".to_owned(), "true".to_owned());
Expand Down Expand Up @@ -540,7 +536,6 @@ mod tests {

#[test]
// for StatementOptions
#[cfg(feature = "parquet")]
fn test_writeroptions_json_from_statement_options() -> Result<()> {
let mut option_map: HashMap<String, String> = HashMap::new();
option_map.insert("compression".to_owned(), "gzip".to_owned());
Expand Down
38 changes: 38 additions & 0 deletions datafusion/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ pub fn longest_consecutive_prefix<T: Borrow<usize>>(
count
}

/// Array Utils
/// Wrap an array into a single element `ListArray`.
/// For example `[1, 2, 3]` would be converted into `[[1, 2, 3]]`
pub fn array_into_list_array(arr: ArrayRef) -> ListArray {
Expand Down Expand Up @@ -429,6 +431,42 @@ pub fn base_type(data_type: &DataType) -> DataType {
}
}

/// A helper function to coerce base type in List.
///
/// Example
/// ```
/// use arrow::datatypes::{DataType, Field};
/// use datafusion_common::utils::coerced_type_with_base_type_only;
/// use std::sync::Arc;
///
/// let data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, true)));
/// let base_type = DataType::Float64;
/// let coerced_type = coerced_type_with_base_type_only(&data_type, &base_type);
/// assert_eq!(coerced_type, DataType::List(Arc::new(Field::new("item", DataType::Float64, true))));
pub fn coerced_type_with_base_type_only(
data_type: &DataType,
base_type: &DataType,
) -> DataType {
match data_type {
DataType::List(field) => {
let data_type = match field.data_type() {
DataType::List(_) => {
coerced_type_with_base_type_only(field.data_type(), base_type)
}
_ => base_type.to_owned(),
};

DataType::List(Arc::new(Field::new(
field.name(),
data_type,
field.is_nullable(),
)))
}

_ => base_type.clone(),
}
}

/// Compute the number of dimensions in a list data type.
pub fn list_ndims(data_type: &DataType) -> u64 {
if let DataType::List(field) = data_type {
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ pub(crate) mod test_util {
limit,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
},
None,
)
Expand Down
48 changes: 11 additions & 37 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::sync::Arc;

use arrow::datatypes::{DataType, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::{plan_err, DataFusionError};

use crate::datasource::file_format::arrow::ArrowFormat;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
Expand Down Expand Up @@ -72,8 +71,6 @@ pub struct CsvReadOptions<'a> {
pub table_partition_cols: Vec<(String, DataType)>,
/// File compression type
pub file_compression_type: FileCompressionType,
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
}
Expand All @@ -97,7 +94,6 @@ impl<'a> CsvReadOptions<'a> {
file_extension: DEFAULT_CSV_EXTENSION,
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
}
}
Expand All @@ -108,12 +104,6 @@ impl<'a> CsvReadOptions<'a> {
self
}

/// Configure mark_infinite setting
pub fn mark_infinite(mut self, infinite: bool) -> Self {
self.infinite = infinite;
self
}

/// Specify delimiter to use for CSV read
pub fn delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
Expand Down Expand Up @@ -324,8 +314,6 @@ pub struct AvroReadOptions<'a> {
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<(String, DataType)>,
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
}

impl<'a> Default for AvroReadOptions<'a> {
Expand All @@ -334,7 +322,6 @@ impl<'a> Default for AvroReadOptions<'a> {
schema: None,
file_extension: DEFAULT_AVRO_EXTENSION,
table_partition_cols: vec![],
infinite: false,
}
}
}
Expand All @@ -349,12 +336,6 @@ impl<'a> AvroReadOptions<'a> {
self
}

/// Configure mark_infinite setting
pub fn mark_infinite(mut self, infinite: bool) -> Self {
self.infinite = infinite;
self
}

/// Specify schema to use for AVRO read
pub fn schema(mut self, schema: &'a Schema) -> Self {
self.schema = Some(schema);
Expand Down Expand Up @@ -466,21 +447,17 @@ pub trait ReadOptions<'a> {
state: SessionState,
table_path: ListingTableUrl,
schema: Option<&'a Schema>,
infinite: bool,
) -> Result<SchemaRef>
where
'a: 'async_trait,
{
match (schema, infinite) {
(Some(s), _) => Ok(Arc::new(s.to_owned())),
(None, false) => Ok(self
.to_listing_options(config)
.infer_schema(&state, &table_path)
.await?),
(None, true) => {
plan_err!("Schema inference for infinite data sources is not supported.")
}
if let Some(s) = schema {
return Ok(Arc::new(s.to_owned()));
}

self.to_listing_options(config)
.infer_schema(&state, &table_path)
.await
}
}

Expand All @@ -500,7 +477,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_infinite_source(self.infinite)
}

async fn get_resolved_schema(
Expand All @@ -509,7 +485,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Expand All @@ -535,7 +511,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, false)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Expand All @@ -551,7 +527,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_infinite_source(self.infinite)
.with_file_sort_order(self.file_sort_order.clone())
}

Expand All @@ -561,7 +536,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Expand All @@ -575,7 +550,6 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
.with_file_extension(self.file_extension)
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_infinite_source(self.infinite)
}

async fn get_resolved_schema(
Expand All @@ -584,7 +558,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Expand All @@ -606,7 +580,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> {
state: SessionState,
table_path: ListingTableUrl,
) -> Result<SchemaRef> {
self._get_resolved_schema(config, state, table_path, self.schema, false)
self._get_resolved_schema(config, state, table_path, self.schema)
.await
}
}
Loading

0 comments on commit 788736a

Please sign in to comment.