Skip to content

Commit

Permalink
Fix join type coercion (#14387) (#14454)
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Feb 3, 2025
1 parent e3ea7d1 commit 8f10fdf
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 8 deletions.
37 changes: 35 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use arrow::{
record_batch::RecordBatch,
};
use arrow_array::{
Array, BooleanArray, DictionaryArray, Float32Array, Float64Array, Int8Array,
UnionArray,
record_batch, Array, BooleanArray, DictionaryArray, Float32Array, Float64Array,
Int8Array, UnionArray,
};
use arrow_buffer::ScalarBuffer;
use arrow_schema::{ArrowError, SchemaRef, UnionFields, UnionMode};
Expand Down Expand Up @@ -1121,6 +1121,39 @@ async fn join() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn join_coercion_unnnamed() -> Result<()> {
let ctx = SessionContext::new();

// Test that join will coerce column types when necessary
// even when the relations don't have unique names
let left = ctx.read_batch(record_batch!(
("id", Int32, [1, 2, 3]),
("name", Utf8, ["a", "b", "c"])
)?)?;
let right = ctx.read_batch(record_batch!(
("id", Int32, [10, 3]),
("name", Utf8View, ["d", "c"]) // Utf8View is a different type
)?)?;
let cols = vec!["name", "id"];

let filter = None;
let join = right.join(left, JoinType::LeftAnti, &cols, &cols, filter)?;
let results = join.collect().await?;

assert_batches_sorted_eq!(
[
"+----+------+",
"| id | name |",
"+----+------+",
"| 10 | d |",
"+----+------+",
],
&results
);
Ok(())
}

#[tokio::test]
async fn join_on() -> Result<()> {
let left = test_table_with_name("a")
Expand Down
23 changes: 17 additions & 6 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,15 @@ impl<'a> TypeCoercionRewriter<'a> {
.map(|(lhs, rhs)| {
// coerce the arguments as though they were a single binary equality
// expression
let (lhs, rhs) = self.coerce_binary_op(lhs, Operator::Eq, rhs)?;
let left_schema = join.left.schema();
let right_schema = join.right.schema();
let (lhs, rhs) = self.coerce_binary_op(
lhs,
left_schema,
Operator::Eq,
rhs,
right_schema,
)?;
Ok((lhs, rhs))
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -275,17 +283,19 @@ impl<'a> TypeCoercionRewriter<'a> {
fn coerce_binary_op(
&self,
left: Expr,
left_schema: &DFSchema,
op: Operator,
right: Expr,
right_schema: &DFSchema,
) -> Result<(Expr, Expr)> {
let (left_type, right_type) = get_input_types(
&left.get_type(self.schema)?,
&left.get_type(left_schema)?,
&op,
&right.get_type(self.schema)?,
&right.get_type(right_schema)?,
)?;
Ok((
left.cast_to(&left_type, self.schema)?,
right.cast_to(&right_type, self.schema)?,
left.cast_to(&left_type, left_schema)?,
right.cast_to(&right_type, right_schema)?,
))
}
}
Expand Down Expand Up @@ -404,7 +414,8 @@ impl TreeNodeRewriter for TypeCoercionRewriter<'_> {
))))
}
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let (left, right) = self.coerce_binary_op(*left, op, *right)?;
let (left, right) =
self.coerce_binary_op(*left, self.schema, op, *right, self.schema)?;
Ok(Transformed::yes(Expr::BinaryExpr(BinaryExpr::new(
Box::new(left),
op,
Expand Down

0 comments on commit 8f10fdf

Please sign in to comment.