Skip to content

Commit

Permalink
Merge pull request #511 from splitgraph/datafusion-36-upgrade
Browse files Browse the repository at this point in the history
Bump to DataFusion 36
  • Loading branch information
gruuya authored Apr 2, 2024
2 parents aa24118 + a1aa708 commit cdaaf96
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 106 deletions.
180 changes: 119 additions & 61 deletions Cargo.lock

Large diffs are not rendered by default.

23 changes: 6 additions & 17 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ arrow-integration-test = "50.0.0"
arrow-schema = "50.0.0"
async-trait = "0.1.64"

datafusion = "35.0.0"
datafusion-common = "35.0.0"
datafusion-expr = "35.0.0"
datafusion = "36.0.0"
datafusion-common = "36.0.0"
datafusion-expr = "36.0.0"

itertools = ">=0.10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down Expand Up @@ -67,17 +67,6 @@ arrow-schema = { git = "https://github.com/apache/arrow-rs", rev = "72d8a7831762
arrow-select = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-string = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }

# Pick up https://github.com/apache/arrow-datafusion/pull/8894 and https://github.com/apache/arrow-datafusion/pull/9007
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-execution = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }

[dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true }
Expand All @@ -98,8 +87,8 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/returnString/convergence", rev = "c58ba5c9903e96fd73b65fda8c7b19192fee5cd3", optional = true }
convergence-arrow = { git = "https://github.com/returnString/convergence", rev = "c58ba5c9903e96fd73b65fda8c7b19192fee5cd3", optional = true }
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-36-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-36-upgrade", optional = true }

dashmap = "5.4.0"

Expand All @@ -109,7 +98,7 @@ datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "sqlparser-0.43", features = ["datafusion"] }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "c223bb66dd518fe2f7a6d5ba29e67267aaf95876", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-35-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-36-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
31 changes: 21 additions & 10 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use datafusion::{
};
use deltalake::kernel::{Action, Add, Schema as DeltaSchema};
use deltalake::operations::{
convert_to_delta::ConvertToDeltaBuilder, create::CreateBuilder, transaction::commit,
convert_to_delta::ConvertToDeltaBuilder, create::CreateBuilder,
transaction::CommitBuilder,
};
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::writer::create_add;
Expand Down Expand Up @@ -395,14 +396,8 @@ impl SeafowlContext {
partition_by: None,
predicate: None,
};
let version = commit(
table_log_store.as_ref(),
&actions,
op,
table.state.as_ref(),
None,
)
.await?;

let version = self.commit(actions, &table, op).await?;

// TODO: if `DeltaTable::get_version_timestamp` was globally public we could also pass the
// exact version timestamp, instead of creating one automatically in our own catalog (which
Expand All @@ -412,10 +407,26 @@ impl SeafowlContext {
.create_new_version(table_uuid, version)
.await?;

debug!("Written table version {version} for {table}");
debug!("Written table version {} for {table}", version);
Ok(table)
}

pub(super) async fn commit(
&self,
actions: Vec<Action>,
table: &DeltaTable,
op: DeltaOperation,
) -> Result<i64> {
Ok(CommitBuilder::default()
.with_actions(actions)
.build(Some(table.snapshot()?), table.log_store(), op)
.map_err(|e| {
DataFusionError::Execution(format!("Transaction commit failed: {e}"))
})?
.await?
.version)
}

// Cleanup the table objects in the storage
pub async fn delete_delta_table<'a>(
&self,
Expand Down
23 changes: 6 additions & 17 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ use datafusion_expr::{
DdlStatement, DmlStatement, DropCatalogSchema, Expr, Filter, WriteOp,
};
use deltalake::kernel::{Action, Add, Remove};
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::DeltaTable;
Expand Down Expand Up @@ -381,14 +380,9 @@ impl SeafowlContext {
partition_by: None,
predicate: None,
};
let version = commit(
table.log_store().as_ref(),
&actions,
op,
table.state.as_ref(),
None,
)
.await?;

let version = self.commit(actions, &table, op).await?;

self.metastore
.tables
.create_new_version(uuid, version)
Expand Down Expand Up @@ -498,14 +492,9 @@ impl SeafowlContext {
}

let op = DeltaOperation::Delete { predicate: None };
let version = commit(
table.log_store().as_ref(),
&actions,
op,
table.state.as_ref(),
None,
)
.await?;

let version = self.commit(actions, &table, op).await?;

self.metastore
.tables
.create_new_version(uuid, version)
Expand Down
1 change: 1 addition & 0 deletions tests/cli/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ fn test_cli_basic() -> std::io::Result<()> {
"| default | information_schema | views | VIEW |",
"| default | information_schema | columns | VIEW |",
"| default | information_schema | df_settings | VIEW |",
"| default | information_schema | schemata | VIEW |",
"+---------------+--------------------+----------------+------------+",
]);

Expand Down
8 changes: 8 additions & 0 deletions tests/statements/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ async fn test_create_table_move_and_drop(
"+--------------------+--------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"| public | test_table_2 |",
Expand Down Expand Up @@ -230,6 +231,7 @@ async fn test_create_table_move_and_drop(
"+--------------------+--------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"| new_./-~:schema | test_table_3 |",
Expand Down Expand Up @@ -314,6 +316,7 @@ async fn test_create_table_drop_schema(
"+--------------------+-------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"| new_schema | table_1 |",
Expand Down Expand Up @@ -344,6 +347,7 @@ async fn test_create_table_drop_schema(
"+--------------------+-------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"| new_schema | table_1 |",
Expand Down Expand Up @@ -402,6 +406,7 @@ async fn test_create_table_drop_schema(
"+--------------------+-------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"+--------------------+-------------+",
Expand All @@ -422,6 +427,7 @@ async fn test_create_table_drop_schema(
"+--------------------+-------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"| public | table_1 |",
Expand Down Expand Up @@ -589,6 +595,7 @@ async fn test_create_external_table(
"+--------------------+-------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"| staging | file |",
Expand Down Expand Up @@ -636,6 +643,7 @@ async fn test_create_external_table(
"+--------------------+-------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"+--------------------+-------------+",
Expand Down
1 change: 1 addition & 0 deletions tests/statements/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async fn test_information_schema() {
"| default | information_schema | columns | VIEW |",
"| default | information_schema | df_settings | VIEW |",
"| default | system | dropped_tables | VIEW |",
"| default | information_schema | schemata | VIEW |",
"| default | system | table_versions | VIEW |",
"| default | information_schema | tables | VIEW |",
"| default | information_schema | views | VIEW |",
Expand Down
1 change: 1 addition & 0 deletions tests/statements/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ async fn test_read_time_travel() {
"+--------------------+-------------+",
"| information_schema | columns |",
"| information_schema | df_settings |",
"| information_schema | schemata |",
"| information_schema | tables |",
"| information_schema | views |",
"| public | test_table |",
Expand Down

0 comments on commit cdaaf96

Please sign in to comment.