diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index ebb1c07a47f9c..95214d11fc88e 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -155,6 +155,7 @@ jobs: run: | export TPCH_DATA=`pwd`/benchmarks/data cargo test verify_q --profile release-nonlto --features=ci -- --test-threads=1 + cargo test serde_q --profile release-nonlto --features=ci -- --test-threads=1 - name: Verify Working Directory Clean run: git diff --exit-code diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index dd9253b8d9923..2104a96c6844a 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -49,3 +49,6 @@ snmalloc-rs = { version = "0.3", optional = true } structopt = { version = "0.3", default-features = false } test-utils = { path = "../test-utils/" } tokio = { version = "^1.0", features = ["macros", "rt", "rt-multi-thread", "parking_lot"] } + +[dev-dependencies] +datafusion-proto = { path = "../datafusion/proto" } diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 44b150f16f639..3269bf4d0fb5c 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -182,7 +182,6 @@ async fn main() -> Result<()> { } } -#[allow(clippy::await_holding_lock)] async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result> { println!("Running benchmarks with the following options: {:?}", opt); let mut benchmark_run = BenchmarkRun::new(opt.query); @@ -193,35 +192,7 @@ async fn benchmark_datafusion(opt: DataFusionBenchmarkOpt) -> Result Result Result<()> { + for table in TPCH_TABLES { + let table_provider = { + let mut session_state = ctx.state.write(); + get_table( + &mut session_state, + opt.path.to_str().unwrap(), + table, + opt.file_format.as_str(), + opt.partitions, + ) + .await? + }; + + if opt.mem_table { + println!("Loading table '{}' into memory", table); + let start = Instant::now(); + let memtable = + MemTable::load(table_provider, Some(opt.partitions), &ctx.state()) + .await?; + println!( + "Loaded table '{}' into memory in {} ms", + table, + start.elapsed().as_millis() + ); + ctx.register_table(*table, Arc::new(memtable))?; + } else { + ctx.register_table(*table, table_provider)?; + } + } + Ok(()) +} + fn write_summary_json(benchmark_run: &mut BenchmarkRun, path: &Path) -> Result<()> { let json = serde_json::to_string_pretty(&benchmark_run).expect("summary is serializable"); @@ -558,17 +566,7 @@ mod tests { } async fn expected_plan(query: usize) -> Result<()> { - let ctx = SessionContext::new(); - for table in TPCH_TABLES { - let table = table.to_string(); - let schema = get_tpch_table_schema(&table); - let mem_table = MemTable::try_new(Arc::new(schema), vec![])?; - ctx.register_table( - TableReference::from(table.as_str()), - Arc::new(mem_table), - )?; - } - + let ctx = create_context()?; let mut actual = String::new(); let sql = get_query_sql(query)?; for sql in &sql { @@ -591,9 +589,9 @@ mod tests { let path = Path::new(&path); if let Ok(expected) = read_text_file(path) { assert_eq!(expected, actual, - // generate output that is easier to copy/paste/update - "\n\nMismatch of expected content in: {:?}\nExpected:\n\n{}\n\nActual:\n\n{}\n\n", - path, expected, actual); + // generate output that is easier to copy/paste/update + "\n\nMismatch of expected content in: {:?}\nExpected:\n\n{}\n\nActual:\n\n{}\n\n", + path, expected, actual); found = true; break; } @@ -603,6 +601,20 @@ mod tests { Ok(()) } + fn create_context() -> Result { + let ctx = SessionContext::new(); + for table in TPCH_TABLES { + let table = table.to_string(); + let schema = get_tpch_table_schema(&table); + let mem_table = MemTable::try_new(Arc::new(schema), vec![])?; + ctx.register_table( + TableReference::from(table.as_str()), + Arc::new(mem_table), + )?; + } + Ok(ctx) + } + /// we need to read line by line and add \n so tests work on Windows fn read_text_file(path: &Path) -> Result { let file = File::open(path)?; @@ -618,270 +630,399 @@ mod tests { Ok(str) } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q1() -> Result<()> { - verify_query(1).await + async fn run_q1() -> Result<()> { + run_query(1).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q2() -> Result<()> { - verify_query(2).await + async fn run_q2() -> Result<()> { + run_query(2).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q3() -> Result<()> { - verify_query(3).await + async fn run_q3() -> Result<()> { + run_query(3).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q4() -> Result<()> { - verify_query(4).await + async fn run_q4() -> Result<()> { + run_query(4).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q5() -> Result<()> { - verify_query(5).await + async fn run_q5() -> Result<()> { + run_query(5).await } - #[cfg(feature = "ci")] - #[ignore] // https://github.com/apache/arrow-datafusion/issues/4024 #[tokio::test] - async fn verify_q6() -> Result<()> { - verify_query(6).await + async fn run_q6() -> Result<()> { + run_query(6).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q7() -> Result<()> { - verify_query(7).await + async fn run_q7() -> Result<()> { + run_query(7).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q8() -> Result<()> { - verify_query(8).await + async fn run_q8() -> Result<()> { + run_query(8).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q9() -> Result<()> { - verify_query(9).await + async fn run_q9() -> Result<()> { + run_query(9).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q10() -> Result<()> { - verify_query(10).await + async fn run_q10() -> Result<()> { + run_query(10).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q11() -> Result<()> { - verify_query(11).await + async fn run_q11() -> Result<()> { + run_query(11).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q12() -> Result<()> { - verify_query(12).await + async fn run_q12() -> Result<()> { + run_query(12).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q13() -> Result<()> { - verify_query(13).await + async fn run_q13() -> Result<()> { + run_query(13).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q14() -> Result<()> { - verify_query(14).await + async fn run_q14() -> Result<()> { + run_query(14).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q15() -> Result<()> { - verify_query(15).await + async fn run_q15() -> Result<()> { + run_query(15).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q16() -> Result<()> { - verify_query(16).await + async fn run_q16() -> Result<()> { + run_query(16).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q17() -> Result<()> { - verify_query(17).await + async fn run_q17() -> Result<()> { + run_query(17).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q18() -> Result<()> { - verify_query(18).await + async fn run_q18() -> Result<()> { + run_query(18).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q19() -> Result<()> { - verify_query(19).await + async fn run_q19() -> Result<()> { + run_query(19).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q20() -> Result<()> { - verify_query(20).await + async fn run_q20() -> Result<()> { + run_query(20).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q21() -> Result<()> { - verify_query(21).await + async fn run_q21() -> Result<()> { + run_query(21).await } - #[cfg(feature = "ci")] #[tokio::test] - async fn verify_q22() -> Result<()> { - verify_query(22).await + async fn run_q22() -> Result<()> { + run_query(22).await + } + + async fn run_query(n: usize) -> Result<()> { + // Tests running query with empty tables, to see whether they run successfully. + + let config = SessionConfig::new() + .with_target_partitions(1) + .with_batch_size(10); + let ctx = SessionContext::with_config(config); + + for &table in TPCH_TABLES { + let schema = get_tpch_table_schema(table); + let batch = RecordBatch::new_empty(Arc::new(schema.to_owned())); + + ctx.register_batch(table, batch)?; + } + + let sql = &get_query_sql(n)?; + for query in sql { + execute_query(&ctx, query, false).await?; + } + + Ok(()) + } +} + +/// CI checks +#[cfg(test)] +#[cfg(feature = "ci")] +mod ci { + use super::*; + use datafusion::sql::TableReference; + use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes}; + use std::io::{BufRead, BufReader}; + use std::sync::Arc; + + async fn serde_round_trip(query: usize) -> Result<()> { + let ctx = SessionContext::default(); + let path = get_tpch_data_path()?; + let opt = DataFusionBenchmarkOpt { + query, + debug: false, + iterations: 1, + partitions: 2, + batch_size: 8192, + path: PathBuf::from(path.to_string()), + file_format: "tbl".to_string(), + mem_table: false, + output_path: None, + disable_statistics: false, + }; + register_tables(&opt, &ctx).await?; + let queries = get_query_sql(query)?; + for query in queries { + let plan = ctx.sql(&query).await?; + let plan = plan.to_logical_plan()?; + let bytes = logical_plan_to_bytes(&plan)?; + let plan2 = logical_plan_from_bytes(&bytes, &ctx)?; + let plan_formatted = format!("{}", plan.display_indent()); + let plan2_formatted = format!("{}", plan2.display_indent()); + assert_eq!(plan_formatted, plan2_formatted); + } + Ok(()) } #[tokio::test] - async fn run_q1() -> Result<()> { - run_query(1).await + async fn serde_q1() -> Result<()> { + serde_round_trip(1).await } #[tokio::test] - async fn run_q2() -> Result<()> { - run_query(2).await + async fn serde_q2() -> Result<()> { + serde_round_trip(2).await } #[tokio::test] - async fn run_q3() -> Result<()> { - run_query(3).await + async fn serde_q3() -> Result<()> { + serde_round_trip(3).await } #[tokio::test] - async fn run_q4() -> Result<()> { - run_query(4).await + async fn serde_q4() -> Result<()> { + serde_round_trip(4).await } #[tokio::test] - async fn run_q5() -> Result<()> { - run_query(5).await + async fn serde_q5() -> Result<()> { + serde_round_trip(5).await } #[tokio::test] - async fn run_q6() -> Result<()> { - run_query(6).await + async fn serde_q6() -> Result<()> { + serde_round_trip(6).await } #[tokio::test] - async fn run_q7() -> Result<()> { - run_query(7).await + async fn serde_q7() -> Result<()> { + serde_round_trip(7).await } #[tokio::test] - async fn run_q8() -> Result<()> { - run_query(8).await + async fn serde_q8() -> Result<()> { + serde_round_trip(8).await } #[tokio::test] - async fn run_q9() -> Result<()> { - run_query(9).await + async fn serde_q9() -> Result<()> { + serde_round_trip(9).await } #[tokio::test] - async fn run_q10() -> Result<()> { - run_query(10).await + async fn serde_q10() -> Result<()> { + serde_round_trip(10).await } #[tokio::test] - async fn run_q11() -> Result<()> { - run_query(11).await + async fn serde_q11() -> Result<()> { + serde_round_trip(11).await } #[tokio::test] - async fn run_q12() -> Result<()> { - run_query(12).await + async fn serde_q12() -> Result<()> { + serde_round_trip(12).await } #[tokio::test] - async fn run_q13() -> Result<()> { - run_query(13).await + async fn serde_q13() -> Result<()> { + serde_round_trip(13).await } #[tokio::test] - async fn run_q14() -> Result<()> { - run_query(14).await + async fn serde_q14() -> Result<()> { + serde_round_trip(14).await } #[tokio::test] - async fn run_q15() -> Result<()> { - run_query(15).await + async fn serde_q15() -> Result<()> { + serde_round_trip(15).await } + #[ignore] // https://github.com/apache/arrow-datafusion/issues/3820 #[tokio::test] - async fn run_q16() -> Result<()> { - run_query(16).await + async fn serde_q16() -> Result<()> { + serde_round_trip(16).await } #[tokio::test] - async fn run_q17() -> Result<()> { - run_query(17).await + async fn serde_q17() -> Result<()> { + serde_round_trip(17).await } #[tokio::test] - async fn run_q18() -> Result<()> { - run_query(18).await + async fn serde_q18() -> Result<()> { + serde_round_trip(18).await } #[tokio::test] - async fn run_q19() -> Result<()> { - run_query(19).await + async fn serde_q19() -> Result<()> { + serde_round_trip(19).await } #[tokio::test] - async fn run_q20() -> Result<()> { - run_query(20).await + async fn serde_q20() -> Result<()> { + serde_round_trip(20).await } #[tokio::test] - async fn run_q21() -> Result<()> { - run_query(21).await + async fn serde_q21() -> Result<()> { + serde_round_trip(21).await } #[tokio::test] - async fn run_q22() -> Result<()> { - run_query(22).await + async fn serde_q22() -> Result<()> { + serde_round_trip(22).await } - async fn run_query(n: usize) -> Result<()> { - // Tests running query with empty tables, to see whether they run successfully. + #[tokio::test] + async fn verify_q1() -> Result<()> { + verify_query(1).await + } - let config = SessionConfig::new() - .with_target_partitions(1) - .with_batch_size(10); - let ctx = SessionContext::with_config(config); + #[tokio::test] + async fn verify_q2() -> Result<()> { + verify_query(2).await + } - for &table in TPCH_TABLES { - let schema = get_tpch_table_schema(table); - let batch = RecordBatch::new_empty(Arc::new(schema.to_owned())); + #[tokio::test] + async fn verify_q3() -> Result<()> { + verify_query(3).await + } - ctx.register_batch(table, batch)?; - } + #[tokio::test] + async fn verify_q4() -> Result<()> { + verify_query(4).await + } - let sql = &get_query_sql(n)?; - for query in sql { - execute_query(&ctx, query, false).await?; - } + #[tokio::test] + async fn verify_q5() -> Result<()> { + verify_query(5).await + } - Ok(()) + #[ignore] // https://github.com/apache/arrow-datafusion/issues/4024 + #[tokio::test] + async fn verify_q6() -> Result<()> { + verify_query(6).await + } + + #[tokio::test] + async fn verify_q7() -> Result<()> { + verify_query(7).await + } + + #[tokio::test] + async fn verify_q8() -> Result<()> { + verify_query(8).await + } + + #[tokio::test] + async fn verify_q9() -> Result<()> { + verify_query(9).await + } + + #[tokio::test] + async fn verify_q10() -> Result<()> { + verify_query(10).await + } + + #[tokio::test] + async fn verify_q11() -> Result<()> { + verify_query(11).await + } + + #[tokio::test] + async fn verify_q12() -> Result<()> { + verify_query(12).await + } + + #[tokio::test] + async fn verify_q13() -> Result<()> { + verify_query(13).await + } + + #[tokio::test] + async fn verify_q14() -> Result<()> { + verify_query(14).await + } + + #[tokio::test] + async fn verify_q15() -> Result<()> { + verify_query(15).await + } + + #[tokio::test] + async fn verify_q16() -> Result<()> { + verify_query(16).await + } + + #[tokio::test] + async fn verify_q17() -> Result<()> { + verify_query(17).await + } + + #[tokio::test] + async fn verify_q18() -> Result<()> { + verify_query(18).await + } + + #[tokio::test] + async fn verify_q19() -> Result<()> { + verify_query(19).await + } + + #[tokio::test] + async fn verify_q20() -> Result<()> { + verify_query(20).await + } + + #[tokio::test] + async fn verify_q21() -> Result<()> { + verify_query(21).await + } + + #[tokio::test] + async fn verify_q22() -> Result<()> { + verify_query(22).await } /// compares query results against stored answers from the git repo @@ -889,20 +1030,12 @@ mod tests { /// * datatypes returned in columns is correct /// * the correct number of rows are returned /// * the content of the rows is correct - #[cfg(feature = "ci")] async fn verify_query(n: usize) -> Result<()> { use datafusion::arrow::datatypes::{DataType, Field}; use datafusion::common::ScalarValue; use datafusion::logical_expr::expr::Cast; - use std::env; - let path = env::var("TPCH_DATA").unwrap_or("benchmarks/data".to_string()); - if !Path::new(&path).exists() { - return Err(DataFusionError::Execution(format!( - "Benchmark data not found (set TPCH_DATA env var to override): {}", - path - ))); - } + let path = get_tpch_data_path()?; let answer_file = format!("{}/answers/q{}.out", path, n); if !Path::new(&answer_file).exists() { @@ -1025,4 +1158,15 @@ mod tests { Ok(()) } + + fn get_tpch_data_path() -> Result { + let path = std::env::var("TPCH_DATA").unwrap_or("benchmarks/data".to_string()); + if !Path::new(&path).exists() { + return Err(DataFusionError::Execution(format!( + "Benchmark data not found (set TPCH_DATA env var to override): {}", + path + ))); + } + Ok(path) + } }