Skip to content

Commit

Permalink
Support struct_expr generate struct in sql (#2389)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang authored May 8, 2022
1 parent f943b6a commit cb44eb1
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 2 deletions.
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use datafusion_physical_expr::conditional_expressions;
use datafusion_physical_expr::datetime_expressions;
use datafusion_physical_expr::math_expressions;
use datafusion_physical_expr::string_expressions;
use datafusion_physical_expr::struct_expressions;
use std::sync::Arc;

/// Create a physical (function) expression.
Expand Down Expand Up @@ -299,6 +300,7 @@ pub fn create_physical_fun(

// string functions
BuiltinScalarFunction::Array => Arc::new(array_expressions::array),
BuiltinScalarFunction::Struct => Arc::new(struct_expressions::struct_expr),
BuiltinScalarFunction::Ascii => Arc::new(|args| match args[0].data_type() {
DataType::Utf8 => {
make_scalar_function(string_expressions::ascii::<i32>)(args)
Expand Down
18 changes: 18 additions & 0 deletions datafusion/core/tests/sql/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,24 @@ async fn test_array_literals() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn test_struct_literals() -> Result<()> {
test_expression!(
"STRUCT(1,2,3,4,5)",
"{\"c0\": 1, \"c1\": 2, \"c2\": 3, \"c3\": 4, \"c4\": 5}"
);
test_expression!("STRUCT(Null)", "{\"c0\": null}");
test_expression!("STRUCT(2)", "{\"c0\": 2}");
test_expression!("STRUCT('1',Null)", "{\"c0\": \"1\", \"c1\": null}");
test_expression!("STRUCT(true, false)", "{\"c0\": true, \"c1\": false}");
test_expression!(
"STRUCT('str1', 'str2')",
"{\"c0\": \"str1\", \"c1\": \"str2\"}"
);

Ok(())
}

#[tokio::test]
async fn test_interval_expressions() -> Result<()> {
// day nano intervals
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub enum BuiltinScalarFunction {
Upper,
/// regexp_match
RegexpMatch,
///struct
Struct,
}

impl BuiltinScalarFunction {
Expand Down Expand Up @@ -236,6 +238,7 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Trim => Volatility::Immutable,
BuiltinScalarFunction::Upper => Volatility::Immutable,
BuiltinScalarFunction::RegexpMatch => Volatility::Immutable,
BuiltinScalarFunction::Struct => Volatility::Immutable,

// Stable builtin functions
BuiltinScalarFunction::Now => Volatility::Stable,
Expand Down Expand Up @@ -329,6 +332,7 @@ impl FromStr for BuiltinScalarFunction {
"trim" => BuiltinScalarFunction::Trim,
"upper" => BuiltinScalarFunction::Upper,
"regexp_match" => BuiltinScalarFunction::RegexpMatch,
"struct" => BuiltinScalarFunction::Struct,
_ => {
return Err(DataFusionError::Plan(format!(
"There is no built-in function named {}",
Expand Down
10 changes: 8 additions & 2 deletions datafusion/expr/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::nullif::SUPPORTED_NULLIF_TYPES;
use crate::type_coercion::data_types;
use crate::ColumnarValue;
use crate::{
array_expressions, conditional_expressions, Accumulator, BuiltinScalarFunction,
Signature, TypeSignature,
array_expressions, conditional_expressions, struct_expressions, Accumulator,
BuiltinScalarFunction, Signature, TypeSignature,
};
use arrow::datatypes::{DataType, Field, TimeUnit};
use datafusion_common::{DataFusionError, Result};
Expand Down Expand Up @@ -224,6 +224,8 @@ pub fn return_type(
_ => Ok(DataType::Float64),
},

BuiltinScalarFunction::Struct => Ok(DataType::Struct(vec![])),

BuiltinScalarFunction::Abs
| BuiltinScalarFunction::Acos
| BuiltinScalarFunction::Asin
Expand Down Expand Up @@ -258,6 +260,10 @@ pub fn signature(fun: &BuiltinScalarFunction) -> Signature {
array_expressions::SUPPORTED_ARRAY_TYPES.to_vec(),
fun.volatility(),
),
BuiltinScalarFunction::Struct => Signature::variadic(
struct_expressions::SUPPORTED_STRUCT_TYPES.to_vec(),
fun.volatility(),
),
BuiltinScalarFunction::Concat | BuiltinScalarFunction::ConcatWithSeparator => {
Signature::variadic(vec![DataType::Utf8], fun.volatility())
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub mod logical_plan;
mod nullif;
mod operator;
mod signature;
pub mod struct_expressions;
mod table_source;
pub mod type_coercion;
mod udaf;
Expand Down
35 changes: 35 additions & 0 deletions datafusion/expr/src/struct_expressions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::DataType;

/// Currently supported types by the struct function.
pub static SUPPORTED_STRUCT_TYPES: &[DataType] = &[
DataType::Boolean,
DataType::UInt8,
DataType::UInt16,
DataType::UInt32,
DataType::UInt64,
DataType::Int8,
DataType::Int16,
DataType::Int32,
DataType::Int64,
DataType::Float32,
DataType::Float64,
DataType::Utf8,
DataType::LargeUtf8,
];
1 change: 1 addition & 0 deletions datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod physical_expr;
pub mod regex_expressions;
mod sort_expr;
pub mod string_expressions;
pub mod struct_expressions;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod window;
Expand Down
77 changes: 77 additions & 0 deletions datafusion/physical-expr/src/struct_expressions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Struct expressions
use arrow::array::*;
use arrow::datatypes::{DataType, Field};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::ColumnarValue;
use std::sync::Arc;

fn array_struct(args: &[ArrayRef]) -> Result<ArrayRef> {
// do not accept 0 arguments.
if args.is_empty() {
return Err(DataFusionError::Internal(
"struct requires at least one argument".to_string(),
));
}

let vec: Vec<_> = args
.iter()
.enumerate()
.map(|(i, arg)| -> Result<(Field, ArrayRef)> {
let field_name = format!("c{}", i);
match arg.data_type() {
DataType::Utf8
| DataType::LargeUtf8
| DataType::Boolean
| DataType::Float32
| DataType::Float64
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::Int64
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64 => Ok((
Field::new(field_name.as_str(), arg.data_type().clone(), true),
arg.clone(),
)),
data_type => Err(DataFusionError::NotImplemented(format!(
"Struct is not implemented for type '{:?}'.",
data_type
))),
}
})
.collect::<Result<Vec<_>>>()?;

Ok(Arc::new(StructArray::from(vec)))
}

/// put values in a struct array.
pub fn struct_expr(values: &[ColumnarValue]) -> Result<ColumnarValue> {
let arrays: Vec<ArrayRef> = values
.iter()
.map(|x| match x {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array().clone(),
})
.collect();
Ok(ColumnarValue::Array(array_struct(arrays.as_slice())?))
}
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ enum ScalarFunction {
Upper=62;
Coalesce=63;
Power=64;
StructFun=65;
}

message ScalarFunctionNode {
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction {
ScalarFunction::RegexpMatch => Self::RegexpMatch,
ScalarFunction::Coalesce => Self::Coalesce,
ScalarFunction::Power => Self::Power,
ScalarFunction::StructFun => Self::Struct,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,7 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction {
BuiltinScalarFunction::RegexpMatch => Self::RegexpMatch,
BuiltinScalarFunction::Coalesce => Self::Coalesce,
BuiltinScalarFunction::Power => Self::Power,
BuiltinScalarFunction::Struct => Self::StructFun,
};

Ok(scalar_function)
Expand Down

0 comments on commit cb44eb1

Please sign in to comment.