Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into dev/xinli/value-normal
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 25, 2024
2 parents cab7a1d + 7db4213 commit fe2f73c
Show file tree
Hide file tree
Showing 215 changed files with 7,204 additions and 3,405 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.github/ export-ignore
datafusion/core/tests/data/newlines_in_values.csv text eol=lf
datafusion/proto/src/generated/prost.rs linguist-generated
datafusion/proto/src/generated/pbjson.rs linguist-generated
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ jobs:
# Ensure that the datafusion crate can be built with only a subset of the function
# packages enabled.
- name: Check datafusion (array_expressions)
run: cargo check --no-default-features --features=array_expressions -p datafusion
- name: Check datafusion (nested_expressions)
run: cargo check --no-default-features --features=nested_expressions -p datafusion

- name: Check datafusion (crypto)
run: cargo check --no-default-features --features=crypto_expressions -p datafusion
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ members = [
"datafusion/execution",
"datafusion/functions-aggregate",
"datafusion/functions",
"datafusion/functions-array",
"datafusion/functions-nested",
"datafusion/optimizer",
"datafusion/physical-expr-common",
"datafusion/physical-expr",
Expand Down Expand Up @@ -94,7 +94,7 @@ datafusion-execution = { path = "datafusion/execution", version = "40.0.0" }
datafusion-expr = { path = "datafusion/expr", version = "40.0.0" }
datafusion-functions = { path = "datafusion/functions", version = "40.0.0" }
datafusion-functions-aggregate = { path = "datafusion/functions-aggregate", version = "40.0.0" }
datafusion-functions-array = { path = "datafusion/functions-array", version = "40.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version = "40.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "40.0.0", default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "40.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", version = "40.0.0", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ This crate has several [features] which can be specified in your `Cargo.toml`.

Default features:

- `array_expressions`: functions for working with arrays such as `array_to_string`
- `nested_expressions`: functions for working with nested type function such as `array_to_string`
- `compression`: reading files compressed with `xz2`, `bzip2`, `flate2`, and `zstd`
- `crypto_expressions`: cryptographic functions such as `md5` and `sha256`
- `datetime_expressions`: date and time functions such as `to_timestamp`
Expand Down
14 changes: 8 additions & 6 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions datafusion-examples/examples/advanced_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ async fn main() -> Result<()> {
df.show().await?;

// Now, run the function using the DataFrame API:
let window_expr = smooth_it.call(
vec![col("speed")], // smooth_it(speed)
vec![col("car")], // PARTITION BY car
vec![col("time").sort(true, true)], // ORDER BY time ASC
WindowFrame::new(None),
);
let window_expr = smooth_it
.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;

// print the results
Expand Down
6 changes: 5 additions & 1 deletion datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl FileFormat for TSVFileFormat {
}
}

#[derive(Default)]
#[derive(Default, Debug)]
/// Factory for creating TSV file formats
///
/// This factory is a wrapper around the CSV file format factory
Expand Down Expand Up @@ -166,6 +166,10 @@ impl FileFormatFactory for TSVFileFactory {
fn default(&self) -> std::sync::Arc<dyn FileFormat> {
todo!()
}

fn as_any(&self) -> &dyn Any {
self
}
}

impl GetExt for TSVFileFactory {
Expand Down
14 changes: 5 additions & 9 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::BinaryExpr;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{AggregateExt, ColumnarValue, ExprSchemable, Operator};
use datafusion_expr::{ColumnarValue, ExprFunctionExt, ExprSchemable, Operator};

/// This example demonstrates the DataFusion [`Expr`] API.
///
Expand Down Expand Up @@ -95,7 +95,7 @@ fn expr_fn_demo() -> Result<()> {
let agg = first_value.call(vec![col("price")]);
assert_eq!(agg.to_string(), "first_value(price)");

// You can use the AggregateExt trait to create more complex aggregates
// You can use the ExprFunctionExt trait to create more complex aggregates
// such as `FIRST_VALUE(price FILTER quantity > 100 ORDER BY ts )
let agg = first_value
.call(vec![col("price")])
Expand Down Expand Up @@ -177,16 +177,12 @@ fn simplify_demo() -> Result<()> {
);

// here are some other examples of what DataFusion is capable of
let schema = Schema::new(vec![
make_field("i", DataType::Int64),
make_field("b", DataType::Boolean),
])
.to_dfschema_ref()?;
let schema = Schema::new(vec![make_field("i", DataType::Int64)]).to_dfschema_ref()?;
let context = SimplifyContext::new(&props).with_schema(schema.clone());
let simplifier = ExprSimplifier::new(context);

// basic arithmetic simplification
// i + 1 + 2 => a + 3
// i + 1 + 2 => i + 3
// (note this is not done if the expr is (col("i") + (lit(1) + lit(2))))
assert_eq!(
simplifier.simplify(col("i") + (lit(1) + lit(2)))?,
Expand All @@ -209,7 +205,7 @@ fn simplify_demo() -> Result<()> {
);

// String --> Date simplification
// `cast('2020-09-01' as date)` --> 18500
// `cast('2020-09-01' as date)` --> 18506 # number of days since epoch 1970-01-01
assert_eq!(
simplifier.simplify(lit("2020-09-01").cast_to(&DataType::Date32, &schema)?)?,
lit(ScalarValue::Date32(Some(18506)))
Expand Down
12 changes: 6 additions & 6 deletions datafusion-examples/examples/simple_udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ async fn main() -> Result<()> {
df.show().await?;

// Now, run the function using the DataFrame API:
let window_expr = smooth_it.call(
vec![col("speed")], // smooth_it(speed)
vec![col("car")], // PARTITION BY car
vec![col("time").sort(true, true)], // ORDER BY time ASC
WindowFrame::new(None),
);
let window_expr = smooth_it
.call(vec![col("speed")]) // smooth_it(speed)
.partition_by(vec![col("car")]) // PARTITION BY car
.order_by(vec![col("time").sort(true, true)]) // ORDER BY time ASC
.window_frame(WindowFrame::new(None))
.build()?;
let df = ctx.table("cars").await?.window(vec![window_expr])?;

// print the results
Expand Down
45 changes: 39 additions & 6 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ config_namespace! {
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
pub has_header: bool, default = false

/// Specifies whether newlines in (quoted) CSV values are supported.
///
/// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
pub newlines_in_values: bool, default = false
}
}

Expand Down Expand Up @@ -367,18 +377,21 @@ config_namespace! {

/// (writing) Sets parquet writer version
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".into()
pub writer_version: String, default = "1.0".to_string()

/// (writing) Sets default parquet compression codec.
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
///
/// Note that this default setting is not the same as
/// the default parquet writer setting.
pub compression: Option<String>, default = Some("zstd(3)".into())

/// (writing) Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
pub dictionary_enabled: Option<bool>, default = None
pub dictionary_enabled: Option<bool>, default = Some(true)

/// (writing) Sets best effort maximum dictionary page size, in bytes
pub dictionary_page_size_limit: usize, default = 1024 * 1024
Expand All @@ -391,21 +404,21 @@ config_namespace! {

/// (writing) Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
pub max_statistics_size: Option<usize>, default = None
pub max_statistics_size: Option<usize>, default = Some(4096)

/// (writing) Target maximum number of rows in each row group (defaults to 1M
/// rows). Writing larger row groups requires more memory to write, but
/// can get better compression and be faster to read.
pub max_row_group_size: usize, default = 1024 * 1024
pub max_row_group_size: usize, default = 1024 * 1024

/// (writing) Sets "created by" property
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()

/// (writing) Sets column index truncate length
pub column_index_truncate_length: Option<usize>, default = None
pub column_index_truncate_length: Option<usize>, default = Some(64)

/// (writing) Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = usize::MAX
pub data_page_row_count_limit: usize, default = 20_000

/// (writing) Sets default encoding for any column.
/// Valid values are: plain, plain_dictionary, rle,
Expand Down Expand Up @@ -1596,6 +1609,14 @@ config_namespace! {
pub quote: u8, default = b'"'
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: usize, default = 100
pub date_format: Option<String>, default = None
Expand Down Expand Up @@ -1668,6 +1689,18 @@ impl CsvOptions {
self
}

/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = Some(newlines_in_values);
self
}

/// Set a `CompressionTypeVariant` of CSV
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
pub fn with_file_compression_type(
Expand Down
30 changes: 2 additions & 28 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,34 +521,8 @@ impl DFSchema {

/// Find the field with the given name
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
let matches = self.qualified_fields_with_unqualified_name(name);
match matches.len() {
0 => Err(unqualified_field_not_found(name, self)),
1 => Ok(matches[0].1),
_ => {
// When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
// Because name may generate from Alias/... . It means that it don't own qualifier.
// For example:
// Join on id = b.id
// Project a.id as id TableScan b id
// In this case, there isn't `ambiguous name` problem. When `matches` just contains
// one field without qualifier, we should return it.
let fields_without_qualifier = matches
.iter()
.filter(|(q, _)| q.is_none())
.collect::<Vec<_>>();
if fields_without_qualifier.len() == 1 {
Ok(fields_without_qualifier[0].1)
} else {
_schema_err!(SchemaError::AmbiguousReference {
field: Column {
relation: None,
name: name.to_string(),
},
})
}
}
}
self.qualified_field_with_unqualified_name(name)
.map(|(_, field)| field)
}

/// Find the field with the given qualified name
Expand Down
Loading

0 comments on commit fe2f73c

Please sign in to comment.