Skip to content

Commit

Permalink
Merge pull request #498 from splitgraph/datafusion-35-upgrade
Browse files Browse the repository at this point in the history
Datafusion post-35 upgrade
  • Loading branch information
gruuya authored Feb 5, 2024
2 parents 5701cc0 + 64f61da commit 27438dc
Show file tree
Hide file tree
Showing 21 changed files with 764 additions and 843 deletions.
918 changes: 372 additions & 546 deletions Cargo.lock

Large diffs are not rendered by default.

75 changes: 42 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@
members = ["clade"]

[workspace.dependencies]
arrow = "49.0.0"
arrow-buffer = "49.0.0"
arrow-csv = "49.0.0"
arrow-flight = "49.0.0"
arrow = "50.0.0"
arrow-buffer = "50.0.0"
arrow-csv = "50.0.0"
arrow-flight = "50.0.0"
# For the JSON format support
# https://github.com/apache/arrow-rs/pull/2868
# https://github.com/apache/arrow-rs/pull/2724
arrow-integration-test = "49.0.0"
arrow-schema = "49.0.0"
arrow-integration-test = "50.0.0"
arrow-schema = "50.0.0"
async-trait = "0.1.64"

datafusion = "34.0.0"
datafusion-common = "34.0.0"
datafusion-expr = "34.0.0"
datafusion = "35.0.0"
datafusion-common = "35.0.0"
datafusion-expr = "35.0.0"

itertools = ">=0.10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }
Expand Down Expand Up @@ -51,22 +51,31 @@ object-store-s3 = ["object_store/aws"]
remote-tables = ["dep:datafusion-remote-tables"]

[patch.crates-io]
# Patch to pick up https://github.com/apache/arrow-rs/pull/5282 from
# https://github.com/splitgraph/arrow-rs/tree/49-with-date-fix

arrow-arith = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-array = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-buffer = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-cast = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-csv = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-data = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-ipc = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-json = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-ord = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-row = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-schema = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-select = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
arrow-string = { git = "https://github.com/splitgraph/arrow-rs", branch = "49-with-date-fix" }
# Pick up https://github.com/apache/arrow-rs/pull/5282
arrow-arith = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-array = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-cast = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-csv = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-data = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-ipc = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-json = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-ord = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-row = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-schema = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-select = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }
arrow-string = { git = "https://github.com/apache/arrow-rs", rev = "72d8a783176219f0864022daba70e84ceab7e221" }

# Pick up https://github.com/apache/arrow-datafusion/pull/8894 and https://github.com/apache/arrow-datafusion/pull/9007
datafusion = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-execution = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion", rev = "a7a74fa522aaef07e6605f414308f3c99bd1ea06" }

[dependencies]
arrow = { workspace = true }
Expand All @@ -88,8 +97,8 @@ clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.3"

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-34-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-34-upgrade", optional = true }
convergence = { git = "https://github.com/returnString/convergence", rev = "c58ba5c9903e96fd73b65fda8c7b19192fee5cd3", optional = true }
convergence-arrow = { git = "https://github.com/returnString/convergence", rev = "c58ba5c9903e96fd73b65fda8c7b19192fee5cd3", optional = true }

dashmap = "5.4.0"

Expand All @@ -99,14 +108,14 @@ datafusion-expr = { workspace = true }

datafusion-remote-tables = { path = "./datafusion_remote_tables", optional = true }

deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9264edea89a2fc1c35f4a6b9faab125748ff3651", features = ["s3-native-tls", "datafusion-ext"] }
deltalake = { git = "https://github.com/splitgraph/delta-rs", branch = "sqlparser-0.43", features = ["datafusion"] }

futures = "0.3"
hex = ">=0.4.0"
itertools = { workspace = true }
lazy_static = ">=1.4.0"
moka = { version = "0.12.2", default_features = false, features = ["future", "atomic64", "quanta"] }
object_store = "0.8"
object_store = "0.9"
parking_lot = "0.12.1"
percent-encoding = "2.2.0"
prost = "0.12.1"
Expand All @@ -123,7 +132,7 @@ rustyline = "13.0"
serde = "1.0.156"
serde_json = "1.0.93"
sha2 = ">=0.10.1"
sqlparser = "0.40"
sqlparser = { version = "0.43", features = ["visitor"] }
sqlx = { version = "0.7.1", features = [ "runtime-tokio-rustls", "sqlite", "any", "uuid" ] }
strum = ">=0.24"
strum_macros = ">=0.24"
Expand All @@ -139,9 +148,9 @@ uuid = "1.2.1"
warp = "0.3.6"

# For WASM user-defined functions
wasi-common = "16.0.0"
wasmtime = "16.0.0"
wasmtime-wasi = "16.0.0"
wasi-common = "17.0.0"
wasmtime = "17.0.0"
wasmtime-wasi = "17.0.0"

[dev-dependencies]
assert_cmd = "2"
Expand Down
2 changes: 1 addition & 1 deletion datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ arrow-schema = { workspace = true }
async-trait = { workspace = true }

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-34-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }
connectorx = { git = "https://github.com/splitgraph/connector-x", branch = "datafusion-35-upgrade", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = { workspace = true }
datafusion-common = { workspace = true }
Expand Down
18 changes: 5 additions & 13 deletions src/context/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use deltalake::writer::create_add;
use deltalake::DeltaTable;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use std::collections::HashMap;
use std::fs::File;
use std::sync::Arc;
use tempfile::{NamedTempFile, TempPath};
Expand Down Expand Up @@ -258,7 +257,7 @@ pub async fn plan_to_object_store(

// Create the corresponding Add action; currently we don't support partition columns
// which simplifies things.
let add = create_add(&HashMap::default(), file_name, size, &metadata)?;
let add = create_add(&Default::default(), file_name, size, &metadata)?;

Ok(add)
});
Expand Down Expand Up @@ -400,7 +399,7 @@ impl SeafowlContext {
table_log_store.as_ref(),
&actions,
op,
table.get_state(),
table.state.as_ref(),
None,
)
.await?;
Expand Down Expand Up @@ -522,7 +521,6 @@ mod tests {
adds[0].path.clone(),
adds[0].size,
adds[0].partition_values.is_empty(),
adds[0].partition_values_parsed.is_none(),
adds[0].data_change,
serde_json::from_str::<Value>(
adds[0].stats.clone().unwrap().as_str()
Expand All @@ -533,7 +531,6 @@ mod tests {
adds[1].path.clone(),
adds[1].size,
adds[1].partition_values.is_empty(),
adds[1].partition_values_parsed.is_none(),
adds[1].data_change,
serde_json::from_str::<Value>(
adds[1].stats.clone().unwrap().as_str()
Expand All @@ -544,8 +541,7 @@ mod tests {
vec![
(
PART_0_FILE_NAME.to_string(),
1269,
true,
1298,
true,
true,
json!({
Expand All @@ -569,8 +565,7 @@ mod tests {
),
(
PART_1_FILE_NAME.to_string(),
1284,
true,
1313,
true,
true,
json!({
Expand Down Expand Up @@ -599,10 +594,7 @@ mod tests {
object_store
.get_log_store(&table_uuid.to_string())
.object_store(),
vec![
Path::from(PART_0_FILE_NAME.to_string()),
Path::from(PART_1_FILE_NAME.to_string()),
],
vec![PART_0_FILE_NAME.to_string(), PART_1_FILE_NAME.to_string()],
)
.await;
}
Expand Down
16 changes: 2 additions & 14 deletions src/context/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ impl SeafowlContext {
// Delegate generic queries to the basic DataFusion logical planner
// (though note EXPLAIN [our custom query] will mean we have to implement EXPLAIN ourselves)
Statement::Explain { .. }
| Statement::ExplainTable { .. }
| Statement::ShowVariable { .. }
| Statement::ShowTables { .. }
| Statement::ShowColumns { .. }
Expand Down Expand Up @@ -309,7 +310,7 @@ impl SeafowlContext {
})),
}))
}
DFStatement::DescribeTableStmt(_) | DFStatement::CreateExternalTable(_) => {
DFStatement::CreateExternalTable(_) => {
self.inner.state().statement_to_plan(stmt).await
}
DFStatement::CopyTo(_) | DFStatement::Explain(_) => {
Expand Down Expand Up @@ -483,19 +484,6 @@ mod tests {
);
}

#[tokio::test]
async fn test_plan_insert_type_mismatch() {
let ctx = in_memory_context_with_test_db().await;

// Try inserting a timestamp into a number (note this will work fine for inserting
// e.g. Utf-8 into numbers at plan time but should fail at execution time if the value
// doesn't convert)
let err = ctx
.create_logical_plan("INSERT INTO testcol.some_table SELECT '2022-01-01', to_timestamp('2022-01-01T12:00:00')")
.await.unwrap_err();
assert_eq!(err.to_string(), "Error during planning: Cannot automatically convert Timestamp(Nanosecond, None) to Float64");
}

#[tokio::test]
async fn test_plan_insert_values_wrong_number() {
let ctx = in_memory_context_with_test_db().await;
Expand Down
30 changes: 22 additions & 8 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,20 +133,15 @@ impl SeafowlContext {
&self,
table_name: impl Into<TableReference<'a>>,
) -> Result<DeltaTable> {
let table_log_store = self
.inner
self.inner
.table_provider(table_name)
.await?
.as_any()
.downcast_ref::<DeltaTable>()
.ok_or_else(|| {
DataFusionError::Execution("Table {table_name} not found".to_string())
})?
.log_store();

// We can't just keep hold of the downcasted ref from above because of
// `temporary value dropped while borrowed`
Ok(DeltaTable::new(table_log_store, Default::default()))
})
.cloned()
}

// Parse the uuid from the Delta table uri if available
Expand Down Expand Up @@ -264,6 +259,25 @@ mod tests {
use super::test_utils::in_memory_context;
use super::*;

#[tokio::test]
async fn test_timestamp_to_date_casting() -> Result<()> {
let ctx = in_memory_context().await;

let plan = ctx.plan_query("SELECT '1998-11-30 00:00:00'::date").await?;

let results = ctx.collect(plan).await?;
let expected = [
"+-----------------------------+",
"| Utf8(\"1998-11-30 00:00:00\") |",
"+-----------------------------+",
"| 1998-11-30 |",
"+-----------------------------+",
];
assert_batches_eq!(expected, &results);

Ok(())
}

#[rstest]
#[case::regular_type_names("float", "float")]
#[case::legacy_type_names("f32", "f32")]
Expand Down
Loading

0 comments on commit 27438dc

Please sign in to comment.