Skip to content

Commit

Permalink
Merge pull request #611 from splitgraph/datafusion-41-upgrade
Browse files Browse the repository at this point in the history
DataFusion 41 upgrade
  • Loading branch information
gruuya authored Aug 15, 2024
2 parents 3a547f0 + 0fa7d3e commit d41edd6
Show file tree
Hide file tree
Showing 17 changed files with 382 additions and 386 deletions.
618 changes: 300 additions & 318 deletions Cargo.lock

Large diffs are not rendered by default.

42 changes: 14 additions & 28 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
members = ["clade", "object_store_factory"]

[workspace.dependencies]
arrow = { version = "52.1.0", features = ["test_utils"] }
arrow-buffer = "52.1.0"
arrow-csv = "52.1.0"
arrow-flight = "52.1.0"
arrow = { version = "52.2.0", features = ["test_utils"] }
arrow-buffer = "52.2.0"
arrow-csv = "52.2.0"
arrow-flight = "52.2.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "52.1.0"
arrow-row = "52.1.0"
arrow-schema = "52.1.0"
arrow-integration-test = "52.2.0"
arrow-row = "52.2.0"
arrow-schema = "52.2.0"
async-trait = "0.1.64"

datafusion = "40.0.0"
datafusion-common = "40.0.0"
datafusion-expr = "40.0.0"
datafusion = "41.0.0"
datafusion-common = "41.0.0"
datafusion-expr = "41.0.0"

futures = "0.3"

Expand All @@ -33,20 +33,6 @@ tracing-log = "0.2"
tracing-subscriber = { version = "0.3.18", features = ["json", "env-filter"] }
url = "2.5"



[patch.crates-io]
# Pick up fix for https://github.com/apache/arrow-datafusion/pull/11386 and backport for https://github.com/apache/datafusion/pull/11765
datafusion = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-common = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-execution = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-optimizer = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-physical-expr = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-physical-plan = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-proto = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }
datafusion-sql = { git = "https://github.com/splitgraph/arrow-datafusion", branch = "backport-pr11765" }

[package]
name = "seafowl"
build = "build.rs"
Expand Down Expand Up @@ -95,8 +81,8 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.14.0"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-40-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-40-upgrade", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-41-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-41-upgrade", optional = true }

dashmap = "6.0.1"

Expand All @@ -106,7 +92,7 @@ datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "b8162c0fb967b3f3d9409d84f08605440dbad13b", features = ["datafusion"] }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "d3a796709a4bc9ee8a0fdc4ea16f8c607c0daf19", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
Expand All @@ -133,7 +119,7 @@ rustyline = "14.0"
serde = { workspace = true }
serde_json = { workspace = true }
sha2 = ">=0.10.1"
sqlparser = { version = "0.47", features = ["visitor"] }
sqlparser = { version = "0.49", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-40-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-41-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
5 changes: 2 additions & 3 deletions datafusion_remote_tables/src/factory.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::provider::RemoteTable;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::datasource::provider::TableProviderFactory;
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionState;
use datafusion_expr::CreateExternalTable;
use std::ops::Deref;
use std::sync::Arc;
Expand All @@ -16,7 +15,7 @@ pub struct RemoteTableFactory {}
impl TableProviderFactory for RemoteTableFactory {
async fn create(
&self,
_ctx: &SessionState,
_ctx: &dyn Session,
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>> {
let table = RemoteTable::new(
Expand Down
4 changes: 2 additions & 2 deletions datafusion_remote_tables/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ use crate::filter_pushdown::{
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use connectorx::prelude::{get_arrow, ArrowDestination, CXQuery, SourceConn, SourceType};
use datafusion::catalog::Session;
use datafusion::common::DataFusionError;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::physical_expr::expressions::{cast, col};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::memory::MemoryExec;
Expand Down Expand Up @@ -119,7 +119,7 @@ impl TableProvider for RemoteTable {

async fn scan(
&self,
_ctx: &SessionState,
_ctx: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::wasm_udf::data_types::{
};
use clade::schema::{SchemaObject, TableObject};
use dashmap::DashMap;
use datafusion::catalog::schema::MemorySchemaProvider;
use datafusion::catalog_common::memory::MemorySchemaProvider;
use datafusion::datasource::TableProvider;

use crate::catalog::memory::MemoryStore;
Expand Down
7 changes: 6 additions & 1 deletion src/config/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
use datafusion::execution::{
context::SessionState,
memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool},
session_state::SessionStateBuilder,
};
use datafusion::{
common::Result,
Expand Down Expand Up @@ -96,7 +97,11 @@ pub fn build_state_with_table_factories(
config: SessionConfig,
runtime: Arc<RuntimeEnv>,
) -> SessionState {
let mut state = SessionState::new_with_config_rt(config, runtime);
let mut state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(runtime)
.with_default_features()
.build();

state
.table_factories_mut()
Expand Down
20 changes: 11 additions & 9 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use datafusion_expr::logical_plan::{Extension, LogicalPlan};
use deltalake::DeltaTable;
use itertools::Itertools;
use sqlparser::ast::{
AlterTableOperation, CreateFunctionBody, Expr as SqlExpr, Expr, Insert, ObjectType,
Query, Statement, TableFactor, TableWithJoins, Value, VisitMut,
AlterTableOperation, CreateFunctionBody, CreateTable as CreateTableSql,
Expr as SqlExpr, Expr, Insert, ObjectType, Query, Statement, TableFactor,
TableWithJoins, Value, VisitMut,
};
use std::sync::Arc;
use tracing::debug;
Expand Down Expand Up @@ -159,7 +160,7 @@ impl SeafowlContext {
}
Statement::Drop { object_type: ObjectType::Table | ObjectType::Schema, .. } => self.inner.state().statement_to_plan(stmt).await,
// CREATE TABLE (create empty table with columns)
Statement::CreateTable {
Statement::CreateTable(CreateTableSql {
query: None,
name,
columns,
Expand All @@ -169,7 +170,7 @@ impl SeafowlContext {
if_not_exists,
or_replace: _,
..
} if constraints.is_empty()
}) if constraints.is_empty()
&& table_properties.is_empty()
&& with_options.is_empty() =>
{
Expand Down Expand Up @@ -215,7 +216,8 @@ impl SeafowlContext {

// Other CREATE TABLE: SqlToRel only allows CreateTableAs statements and makes
// a CreateMemoryTable node. We're fine with that, but we'll execute it differently.
Statement::CreateTable { query: Some(ref mut input), .. } => {
Statement::CreateTable(CreateTableSql { query: Some(ref mut input), .. })
=> {
let state = self.rewrite_time_travel_query(input).await?;
state.statement_to_plan(stmt).await
},
Expand Down Expand Up @@ -392,7 +394,7 @@ mod tests {
.unwrap();

assert_eq!(
format!("{plan:?}"),
format!("{plan}"),
"Dml: op=[Insert Into] table=[testcol.some_table]\
\n Projection: CAST(column1 AS Date32) AS date, CAST(column2 AS Float64) AS value\
\n Values: (Utf8(\"2022-01-01T12:00:00\"), Int64(42))"
Expand All @@ -411,7 +413,7 @@ mod tests {
.await
.unwrap();

assert_eq!(format!("{plan:?}"), "Dml: op=[Insert Into] table=[testcol.some_table]\
assert_eq!(format!("{plan}"), "Dml: op=[Insert Into] table=[testcol.some_table]\
\n Projection: testdb.testcol.some_table.date AS date, testdb.testcol.some_table.value AS value\
\n TableScan: testdb.testcol.some_table projection=[date, value]");
}
Expand All @@ -420,7 +422,7 @@ mod tests {
let ctx = in_memory_context_with_test_db().await;

let plan = ctx.create_logical_plan(query).await.unwrap();
format!("{plan:?}")
format!("{plan}")
}

#[tokio::test]
Expand Down Expand Up @@ -463,7 +465,7 @@ mod tests {
.unwrap();

assert_eq!(
format!("{plan:?}"),
format!("{plan}"),
"Dml: op=[Insert Into] table=[testcol.some_table]\
\n Projection: CAST(column1 AS Date32) AS date, CAST(column2 AS Float64) AS value\
\n Values: (Utf8(\"2022-01-01T12:00:00\"), Int64(42))"
Expand Down
1 change: 1 addition & 0 deletions src/datafusion/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ impl<'a> DFParser<'a> {
expr,
asc,
nulls_first,
with_fill: None,
})
}

Expand Down
23 changes: 22 additions & 1 deletion src/datafusion/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub(crate) fn convert_simple_data_type(sql_type: &SQLDataType) -> Result<DataTyp
SQLDataType::Interval => Ok(DataType::Interval(IntervalUnit::MonthDayNano)),
// Explicitly list all other types so that if sqlparser
// adds/changes the `SQLDataType` the compiler will tell us on upgrade
// and avoid bugs like https://github.com/apache/arrow-datafusion/issues/3059
// and avoid bugs like https://github.com/apache/datafusion/issues/3059
SQLDataType::Nvarchar(_)
| SQLDataType::JSON
| SQLDataType::Uuid
Expand Down Expand Up @@ -140,6 +140,27 @@ pub(crate) fn convert_simple_data_type(sql_type: &SQLDataType) -> Result<DataTyp
| SQLDataType::Struct(_)
| SQLDataType::JSONB
| SQLDataType::Unspecified
// Clickhouse datatypes
| SQLDataType::Int16
| SQLDataType::Int32
| SQLDataType::Int128
| SQLDataType::Int256
| SQLDataType::UInt8
| SQLDataType::UInt16
| SQLDataType::UInt32
| SQLDataType::UInt64
| SQLDataType::UInt128
| SQLDataType::UInt256
| SQLDataType::Float32
| SQLDataType::Date32
| SQLDataType::Datetime64(_, _)
| SQLDataType::FixedString(_)
| SQLDataType::Map(_, _)
| SQLDataType::Tuple(_)
| SQLDataType::Nested(_)
| SQLDataType::Union(_)
| SQLDataType::Nullable(_)
| SQLDataType::LowCardinality(_)
=> not_impl_err!(
"Unsupported SQL type {sql_type:?}"
),
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/flight/sync/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow::array::{new_null_array, Array, ArrayRef, RecordBatch, UInt64Array};
use arrow::compute::{concat_batches, take};
use arrow_row::{Row, RowConverter, SortField};
use clade::sync::ColumnRole;
use datafusion::physical_expr::expressions::{MaxAccumulator, MinAccumulator};
use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{col, lit, Accumulator, Expr};
use std::collections::{HashMap, HashSet, VecDeque};
Expand Down
13 changes: 6 additions & 7 deletions src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use arrow::array::RecordBatch;
use arrow_schema::{SchemaBuilder, SchemaRef};
use clade::sync::ColumnRole;
use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::prelude::DataFrame;
Expand Down Expand Up @@ -476,12 +477,10 @@ impl SeafowlDataSyncWriter {

// Construct a state for physical planning; we omit all analyzer/optimizer rules to increase
// the stack overflow threshold that occurs during recursive plan tree traversal in DF.
let state = self
.context
.inner
.state()
let state = SessionStateBuilder::new_from_existing(self.context.inner.state())
.with_analyzer_rules(vec![])
.with_optimizer_rules(vec![]);
.with_optimizer_rules(vec![])
.build();
let mut sync_df = DataFrame::new(state, base_plan);

// Iterate through all syncs for this table and construct a full plan by applying each
Expand Down Expand Up @@ -1265,7 +1264,7 @@ mod tests {
.unwrap();
let expected = [
"+----------+--------------------+--------------------+",
"| count(*) | MIN(test_table.c1) | MAX(test_table.c1) |",
"| count(*) | min(test_table.c1) | max(test_table.c1) |",
"+----------+--------------------+--------------------+",
"| 1000 | 0 | 999 |",
"+----------+--------------------+--------------------+",
Expand Down Expand Up @@ -1319,7 +1318,7 @@ mod tests {
.unwrap();
let expected = [
"+----------+--------------------+--------------------+",
"| count(*) | MIN(test_table.c1) | MAX(test_table.c1) |",
"| count(*) | min(test_table.c1) | max(test_table.c1) |",
"+----------+--------------------+--------------------+",
"| 1000 | 1000 | 1999 |",
"+----------+--------------------+--------------------+",
Expand Down
10 changes: 7 additions & 3 deletions src/frontend/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,7 +1002,9 @@ pub mod tests {
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
assert_eq!(
resp.body(),
"SQL error: ParserError(\"Expected an SQL statement, found: 7\")"
&Bytes::from(
"SQL error: ParserError(\"Expected: an SQL statement, found: 7\")"
),
);
}

Expand Down Expand Up @@ -1127,7 +1129,9 @@ pub mod tests {
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
assert_eq!(
resp.body(),
"SQL error: ParserError(\"Expected an SQL statement, found: SLEECT\")"
&Bytes::from(
"SQL error: ParserError(\"Expected: an SQL statement, found: SLEECT\")"
),
);
}

Expand All @@ -1149,7 +1153,7 @@ pub mod tests {
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
assert_eq!(
resp.body(),
"SQL error: ParserError(\"Expected AS, found: EOF\")"
&Bytes::from("SQL error: ParserError(\"Expected: AS, found: EOF\")"),
);
}

Expand Down
6 changes: 2 additions & 4 deletions src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ use datafusion::physical_expr::expressions::{case, cast, col};
use datafusion::physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion::{
arrow::datatypes::Schema as ArrowSchema,
catalog::{
schema::{MemorySchemaProvider, SchemaProvider},
CatalogProvider,
},
catalog::{CatalogProvider, SchemaProvider},
catalog_common::memory::MemorySchemaProvider,
common::{DataFusionError, Result},
datasource::TableProvider,
};
Expand Down
Loading

0 comments on commit d41edd6

Please sign in to comment.