diff --git a/benchmarks/run.sh b/benchmarks/run.sh index 51fe6e46b..11299b00e 100755 --- a/benchmarks/run.sh +++ b/benchmarks/run.sh @@ -15,13 +15,25 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +set -e set -x # This bash script is meant to be run inside the docker-compose environment. Check the README for instructions -#TODO: add queries 15, 16, 19, 20, and 22 once we support them +# regression checks for queries that return the correct results +# TODO add all queries once https://github.com/apache/arrow-datafusion/issues/3478 is implemented and once +# queries return decimal results with the correct precision +for query in 4 12 13 +do + /root/tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug --expected /data +done -for query in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 17 18 21 +# at least make sure these queries run, even though we do not check that the results are correct yet + +#TODO: add query 16 once we support it +for query in 1 2 3 5 6 7 8 9 10 11 14 15 17 18 19 20 21 22 do /root/tpch benchmark ballista --host ballista-scheduler --port 50050 --query $query --path /data --format tbl --iterations 1 --debug done + diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index c262c3b6b..f42d222ff 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -22,6 +22,8 @@ use ballista::prelude::{ BallistaConfig, BALLISTA_DEFAULT_BATCH_SIZE, BALLISTA_DEFAULT_SHUFFLE_PARTITIONS, BALLISTA_JOB_NAME, }; +use datafusion::arrow::array::*; +use datafusion::arrow::util::display::array_value_to_string; use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; use datafusion::datasource::listing::ListingTableUrl; @@ -29,6 +31,7 @@ use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::SessionState; use datafusion::logical_expr::LogicalPlan; +use datafusion::logical_expr::{expr::Cast, Expr}; use datafusion::parquet::basic::Compression; use datafusion::parquet::file::properties::WriterProperties; use datafusion::physical_plan::display::DisplayableExecutionPlan; @@ -78,6 +81,10 @@ struct BallistaBenchmarkOpt { #[structopt(short, long)] debug: bool, + /// Path to expected results + #[structopt(short = "e", long = "expected")] + expected_results: Option, + /// Number of iterations of each test run #[structopt(short = "i", long = "iterations", default_value = "3")] iterations: usize, @@ -408,6 +415,11 @@ async fn benchmark_ballista(opt: BallistaBenchmarkOpt) -> Result<()> { if opt.debug { pretty::print_batches(&batches)?; } + + if let Some(expected_results_path) = opt.expected_results.as_ref() { + let expected = get_expected_results(opt.query, expected_results_path).await?; + assert_expected_results(&expected, &batches) + } } let avg = millis.iter().sum::() / millis.len() as f64; @@ -524,7 +536,7 @@ async fn loadtest_ballista(opt: BallistaLoadtestOpt) -> Result<()> { "Client {} Round {} Query {} took {:.1} ms ", &client_id, &i, query_id, elapsed ); - if opt.debug { + if opt.debug && !batches.is_empty() { pretty::print_batches(&batches).unwrap(); } } @@ -697,7 +709,9 @@ async fn execute_query( "=== Physical plan with metrics ===\n{}\n", DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()).indent() ); - pretty::print_batches(&result)?; + if !result.is_empty() { + pretty::print_batches(&result)?; + } } Ok(result) } @@ -846,7 +860,7 @@ fn get_schema(table: &str) -> Schema { Field::new("p_type", DataType::Utf8, false), Field::new("p_size", DataType::Int32, false), Field::new("p_container", DataType::Utf8, false), - Field::new("p_retailprice", DataType::Float64, false), + Field::new("p_retailprice", DataType::Decimal128(15, 2), false), Field::new("p_comment", DataType::Utf8, false), ]), @@ -856,7 +870,7 @@ fn get_schema(table: &str) -> Schema { Field::new("s_address", DataType::Utf8, false), Field::new("s_nationkey", DataType::Int64, false), Field::new("s_phone", DataType::Utf8, false), - Field::new("s_acctbal", DataType::Float64, false), + Field::new("s_acctbal", DataType::Decimal128(15, 2), false), Field::new("s_comment", DataType::Utf8, false), ]), @@ -864,7 +878,7 @@ fn get_schema(table: &str) -> Schema { Field::new("ps_partkey", DataType::Int64, false), Field::new("ps_suppkey", DataType::Int64, false), Field::new("ps_availqty", DataType::Int32, false), - Field::new("ps_supplycost", DataType::Float64, false), + Field::new("ps_supplycost", DataType::Decimal128(15, 2), false), Field::new("ps_comment", DataType::Utf8, false), ]), @@ -874,7 +888,7 @@ fn get_schema(table: &str) -> Schema { Field::new("c_address", DataType::Utf8, false), Field::new("c_nationkey", DataType::Int64, false), Field::new("c_phone", DataType::Utf8, false), - Field::new("c_acctbal", DataType::Float64, false), + Field::new("c_acctbal", DataType::Decimal128(15, 2), false), Field::new("c_mktsegment", DataType::Utf8, false), Field::new("c_comment", DataType::Utf8, false), ]), @@ -883,7 +897,7 @@ fn get_schema(table: &str) -> Schema { Field::new("o_orderkey", DataType::Int64, false), Field::new("o_custkey", DataType::Int64, false), Field::new("o_orderstatus", DataType::Utf8, false), - Field::new("o_totalprice", DataType::Float64, false), + Field::new("o_totalprice", DataType::Decimal128(15, 2), false), Field::new("o_orderdate", DataType::Date32, false), Field::new("o_orderpriority", DataType::Utf8, false), Field::new("o_clerk", DataType::Utf8, false), @@ -896,10 +910,10 @@ fn get_schema(table: &str) -> Schema { Field::new("l_partkey", DataType::Int64, false), Field::new("l_suppkey", DataType::Int64, false), Field::new("l_linenumber", DataType::Int32, false), - Field::new("l_quantity", DataType::Float64, false), - Field::new("l_extendedprice", DataType::Float64, false), - Field::new("l_discount", DataType::Float64, false), - Field::new("l_tax", DataType::Float64, false), + Field::new("l_quantity", DataType::Decimal128(15, 2), false), + Field::new("l_extendedprice", DataType::Decimal128(15, 2), false), + Field::new("l_discount", DataType::Decimal128(15, 2), false), + Field::new("l_tax", DataType::Decimal128(15, 2), false), Field::new("l_returnflag", DataType::Utf8, false), Field::new("l_linestatus", DataType::Utf8, false), Field::new("l_shipdate", DataType::Date32, false), @@ -969,6 +983,273 @@ impl BenchmarkRun { } } +/// Compare actual results against expected results at scale factor 1 +fn assert_expected_results(expected: &[RecordBatch], actual: &[RecordBatch]) { + // assert schema equality without comparing nullable values + assert_eq!( + nullable_schema(expected[0].schema()), + nullable_schema(actual[0].schema()) + ); + + // convert both datasets to Vec> for simple comparison + let expected_vec = result_vec(expected); + let actual_vec = result_vec(actual); + + // basic result comparison + assert_eq!(expected_vec.len(), actual_vec.len()); + + // compare each row. this works as all TPC-H queries have deterministically ordered results + for i in 0..actual_vec.len() { + assert_eq!(expected_vec[i], actual_vec[i]); + } +} + +/// Get the expected answer for a specific query at scale factor 1 +async fn get_expected_results(n: usize, path: &str) -> Result> { + let ctx = SessionContext::new(); + let schema = string_schema(get_answer_schema(n)); + let options = CsvReadOptions::new() + .schema(&schema) + .delimiter(b'|') + .file_extension(".out"); + let answer_path = format!("{}/answers/q{}.out", path, n); + println!("Looking for expected results at {}", answer_path); + let df = ctx.read_csv(&answer_path, options).await?; + let df = df.select( + get_answer_schema(n) + .fields() + .iter() + .map(|field| { + Expr::Alias( + Box::new(Expr::Cast(Cast { + expr: Box::new(trim(col(Field::name(field)))), + data_type: Field::data_type(field).to_owned(), + })), + Field::name(field).to_string(), + ) + }) + .collect::>(), + )?; + df.collect().await +} + +// convert the schema to the same but with all columns set to nullable=true. +// this allows direct schema comparison ignoring nullable. +fn nullable_schema(schema: Arc) -> Schema { + Schema::new( + schema + .fields() + .iter() + .map(|field| { + Field::new(Field::name(field), Field::data_type(field).to_owned(), true) + }) + .collect::>(), + ) +} + +/// Converts the results into a 2d array of strings, `result[row][column]` +/// Special cases nulls to NULL for testing +fn result_vec(results: &[RecordBatch]) -> Vec> { + let mut result = vec![]; + for batch in results { + for row_index in 0..batch.num_rows() { + let row_vec = batch + .columns() + .iter() + .map(|column| col_str(column, row_index)) + .collect(); + result.push(row_vec); + } + } + result +} + +fn get_answer_schema(n: usize) -> Schema { + match n { + 1 => Schema::new(vec![ + Field::new("l_returnflag", DataType::Utf8, true), + Field::new("l_linestatus", DataType::Utf8, true), + Field::new("sum_qty", DataType::Float64, true), + Field::new("sum_base_price", DataType::Float64, true), + Field::new("sum_disc_price", DataType::Float64, true), + Field::new("sum_charge", DataType::Float64, true), + Field::new("avg_qty", DataType::Float64, true), + Field::new("avg_price", DataType::Decimal128(19, 6), false), //TODO should be precision 2 + Field::new("avg_disc", DataType::Float64, true), + Field::new("count_order", DataType::Int64, true), + ]), + + 2 => Schema::new(vec![ + Field::new("s_acctbal", DataType::Decimal128(15, 2), true), + Field::new("s_name", DataType::Utf8, true), + Field::new("n_name", DataType::Utf8, true), + Field::new("p_partkey", DataType::Int32, true), + Field::new("p_mfgr", DataType::Utf8, true), + Field::new("s_address", DataType::Utf8, true), + Field::new("s_phone", DataType::Utf8, true), + Field::new("s_comment", DataType::Utf8, true), + ]), + + 3 => Schema::new(vec![ + Field::new("l_orderkey", DataType::Int32, true), + Field::new("revenue", DataType::Decimal128(19, 6), true), //TODO should be precision 2 + Field::new("o_orderdate", DataType::Date32, true), + Field::new("o_shippriority", DataType::Int32, true), + ]), + + 4 => Schema::new(vec![ + Field::new("o_orderpriority", DataType::Utf8, true), + Field::new("order_count", DataType::Int64, true), + ]), + + 5 => Schema::new(vec![ + Field::new("n_name", DataType::Utf8, true), + Field::new("revenue", DataType::Decimal128(38, 4), true), //TODO should be precision 2 + ]), + + 6 => Schema::new(vec![Field::new( + "revenue", + DataType::Decimal128(25, 2), + true, + )]), + + 7 => Schema::new(vec![ + Field::new("supp_nation", DataType::Utf8, true), + Field::new("cust_nation", DataType::Utf8, true), + Field::new("l_year", DataType::Int32, true), + Field::new("revenue", DataType::Decimal128(38, 4), true), //TODO should be precision 2 + ]), + + 8 => Schema::new(vec![ + Field::new("o_year", DataType::Int32, true), + Field::new("mkt_share", DataType::Decimal128(38, 4), true), //TODO should be precision 2 + ]), + + 9 => Schema::new(vec![ + Field::new("nation", DataType::Utf8, true), + Field::new("o_year", DataType::Int32, true), + Field::new("sum_profit", DataType::Decimal128(38, 4), true), //TODO should be precision 2 + ]), + + 10 => Schema::new(vec![ + Field::new("c_custkey", DataType::Int32, true), + Field::new("c_name", DataType::Utf8, true), + Field::new("revenue", DataType::Decimal128(38, 4), true), //TODO should be precision 2 + Field::new("c_acctbal", DataType::Decimal128(15, 2), true), + Field::new("n_name", DataType::Utf8, true), + Field::new("c_address", DataType::Utf8, true), + Field::new("c_phone", DataType::Utf8, true), + Field::new("c_comment", DataType::Utf8, true), + ]), + + 11 => Schema::new(vec![ + Field::new("ps_partkey", DataType::Int32, true), + Field::new("value", DataType::Decimal128(36, 2), true), + ]), + + 12 => Schema::new(vec![ + Field::new("l_shipmode", DataType::Utf8, true), + Field::new("high_line_count", DataType::Int64, true), + Field::new("low_line_count", DataType::Int64, true), + ]), + + 13 => Schema::new(vec![ + Field::new("c_count", DataType::Int64, true), + Field::new("custdist", DataType::Int64, true), + ]), + + 14 => Schema::new(vec![ + Field::new("promo_revenue", DataType::Decimal128(38, 38), true), //TODO should be precision 2 + ]), + + 15 => Schema::new(vec![Field::new("promo_revenue", DataType::Float64, true)]), + + 16 => Schema::new(vec![ + Field::new("p_brand", DataType::Utf8, true), + Field::new("p_type", DataType::Utf8, true), + Field::new("c_phone", DataType::Int32, true), + Field::new("c_comment", DataType::Int32, true), + ]), + + 17 => Schema::new(vec![ + Field::new("avg_yearly", DataType::Decimal128(38, 3), true), //TODO should be precision 2 + ]), + + 18 => Schema::new(vec![ + Field::new("c_name", DataType::Utf8, true), + Field::new("c_custkey", DataType::Int64, true), + Field::new("o_orderkey", DataType::Int64, true), + Field::new("o_orderdate", DataType::Date32, true), + Field::new("o_totalprice", DataType::Decimal128(15, 2), true), + Field::new("sum_l_quantity", DataType::Decimal128(25, 2), true), + ]), + + 19 => Schema::new(vec![ + Field::new("revenue", DataType::Decimal128(38, 4), true), //TODO should be precision 2 + ]), + + 20 => Schema::new(vec![ + Field::new("s_name", DataType::Utf8, true), + Field::new("s_address", DataType::Utf8, true), + ]), + + 21 => Schema::new(vec![ + Field::new("s_name", DataType::Utf8, true), + Field::new("numwait", DataType::Int64, true), + ]), + + 22 => Schema::new(vec![ + Field::new("cntrycode", DataType::Utf8, true), + Field::new("numcust", DataType::Int64, true), + Field::new("totacctbal", DataType::Decimal128(25, 2), true), + ]), + + _ => unimplemented!(), + } +} + +/// convert expected schema to all utf8 so columns can be read as strings to be parsed separately +/// this is due to the fact that the csv parser cannot handle leading/trailing spaces +fn string_schema(schema: Schema) -> Schema { + Schema::new( + schema + .fields() + .iter() + .map(|field| { + Field::new( + Field::name(field), + DataType::Utf8, + Field::is_nullable(field), + ) + }) + .collect::>(), + ) +} + +/// Specialised String representation +fn col_str(column: &ArrayRef, row_index: usize) -> String { + if column.is_null(row_index) { + return "NULL".to_string(); + } + + // Special case ListArray as there is no pretty print support for it yet + if let DataType::FixedSizeList(_, n) = column.data_type() { + let array = column + .as_any() + .downcast_ref::() + .unwrap() + .value(row_index); + + let mut r = Vec::with_capacity(*n as usize); + for i in 0..*n { + r.push(col_str(&array, i as usize)); + } + return format!("[{}]", r.join(",")); + } + + array_value_to_string(column, row_index).unwrap() +} + #[derive(Debug, Serialize)] struct QueryResult { elapsed: f64, @@ -978,9 +1259,6 @@ struct QueryResult { #[cfg(test)] mod tests { use super::*; - use datafusion::arrow::array::*; - use datafusion::arrow::util::display::array_value_to_string; - use datafusion::logical_expr::{expr::Cast, Expr}; use std::env; use std::sync::Arc; @@ -1212,217 +1490,6 @@ mod tests { run_query(22).await } - /// Specialised String representation - fn col_str(column: &ArrayRef, row_index: usize) -> String { - if column.is_null(row_index) { - return "NULL".to_string(); - } - - // Special case ListArray as there is no pretty print support for it yet - if let DataType::FixedSizeList(_, n) = column.data_type() { - let array = column - .as_any() - .downcast_ref::() - .unwrap() - .value(row_index); - - let mut r = Vec::with_capacity(*n as usize); - for i in 0..*n { - r.push(col_str(&array, i as usize)); - } - return format!("[{}]", r.join(",")); - } - - array_value_to_string(column, row_index).unwrap() - } - - /// Converts the results into a 2d array of strings, `result[row][column]` - /// Special cases nulls to NULL for testing - fn result_vec(results: &[RecordBatch]) -> Vec> { - let mut result = vec![]; - for batch in results { - for row_index in 0..batch.num_rows() { - let row_vec = batch - .columns() - .iter() - .map(|column| col_str(column, row_index)) - .collect(); - result.push(row_vec); - } - } - result - } - - fn get_answer_schema(n: usize) -> Schema { - match n { - 1 => Schema::new(vec![ - Field::new("l_returnflag", DataType::Utf8, true), - Field::new("l_linestatus", DataType::Utf8, true), - Field::new("sum_qty", DataType::Float64, true), - Field::new("sum_base_price", DataType::Float64, true), - Field::new("sum_disc_price", DataType::Float64, true), - Field::new("sum_charge", DataType::Float64, true), - Field::new("avg_qty", DataType::Float64, true), - Field::new("avg_price", DataType::Float64, true), - Field::new("avg_disc", DataType::Float64, true), - Field::new("count_order", DataType::UInt64, true), - ]), - - 2 => Schema::new(vec![ - Field::new("s_acctbal", DataType::Float64, true), - Field::new("s_name", DataType::Utf8, true), - Field::new("n_name", DataType::Utf8, true), - Field::new("p_partkey", DataType::Int32, true), - Field::new("p_mfgr", DataType::Utf8, true), - Field::new("s_address", DataType::Utf8, true), - Field::new("s_phone", DataType::Utf8, true), - Field::new("s_comment", DataType::Utf8, true), - ]), - - 3 => Schema::new(vec![ - Field::new("l_orderkey", DataType::Int32, true), - Field::new("revenue", DataType::Float64, true), - Field::new("o_orderdate", DataType::Date32, true), - Field::new("o_shippriority", DataType::Int32, true), - ]), - - 4 => Schema::new(vec![ - Field::new("o_orderpriority", DataType::Utf8, true), - Field::new("order_count", DataType::Int32, true), - ]), - - 5 => Schema::new(vec![ - Field::new("n_name", DataType::Utf8, true), - Field::new("revenue", DataType::Float64, true), - ]), - - 6 => Schema::new(vec![Field::new("revenue", DataType::Float64, true)]), - - 7 => Schema::new(vec![ - Field::new("supp_nation", DataType::Utf8, true), - Field::new("cust_nation", DataType::Utf8, true), - Field::new("l_year", DataType::Int32, true), - Field::new("revenue", DataType::Float64, true), - ]), - - 8 => Schema::new(vec![ - Field::new("o_year", DataType::Int32, true), - Field::new("mkt_share", DataType::Float64, true), - ]), - - 9 => Schema::new(vec![ - Field::new("nation", DataType::Utf8, true), - Field::new("o_year", DataType::Int32, true), - Field::new("sum_profit", DataType::Float64, true), - ]), - - 10 => Schema::new(vec![ - Field::new("c_custkey", DataType::Int32, true), - Field::new("c_name", DataType::Utf8, true), - Field::new("revenue", DataType::Float64, true), - Field::new("c_acctbal", DataType::Float64, true), - Field::new("n_name", DataType::Utf8, true), - Field::new("c_address", DataType::Utf8, true), - Field::new("c_phone", DataType::Utf8, true), - Field::new("c_comment", DataType::Utf8, true), - ]), - - 11 => Schema::new(vec![ - Field::new("ps_partkey", DataType::Int32, true), - Field::new("value", DataType::Float64, true), - ]), - - 12 => Schema::new(vec![ - Field::new("l_shipmode", DataType::Utf8, true), - Field::new("high_line_count", DataType::Int64, true), - Field::new("low_line_count", DataType::Int64, true), - ]), - - 13 => Schema::new(vec![ - Field::new("c_count", DataType::Int64, true), - Field::new("custdist", DataType::Int64, true), - ]), - - 14 => Schema::new(vec![Field::new("promo_revenue", DataType::Float64, true)]), - - 15 => Schema::new(vec![Field::new("promo_revenue", DataType::Float64, true)]), - - 16 => Schema::new(vec![ - Field::new("p_brand", DataType::Utf8, true), - Field::new("p_type", DataType::Utf8, true), - Field::new("c_phone", DataType::Int32, true), - Field::new("c_comment", DataType::Int32, true), - ]), - - 17 => Schema::new(vec![Field::new("avg_yearly", DataType::Float64, true)]), - - 18 => Schema::new(vec![ - Field::new("c_name", DataType::Utf8, true), - Field::new("c_custkey", DataType::Int32, true), - Field::new("o_orderkey", DataType::Int32, true), - Field::new("o_orderdate", DataType::Date32, true), - Field::new("o_totalprice", DataType::Float64, true), - Field::new("sum_l_quantity", DataType::Float64, true), - ]), - - 19 => Schema::new(vec![Field::new("revenue", DataType::Float64, true)]), - - 20 => Schema::new(vec![ - Field::new("s_name", DataType::Utf8, true), - Field::new("s_address", DataType::Utf8, true), - ]), - - 21 => Schema::new(vec![ - Field::new("s_name", DataType::Utf8, true), - Field::new("numwait", DataType::Int32, true), - ]), - - 22 => Schema::new(vec![ - Field::new("cntrycode", DataType::Int32, true), - Field::new("numcust", DataType::Int32, true), - Field::new("totacctbal", DataType::Float64, true), - ]), - - _ => unimplemented!(), - } - } - - // convert expected schema to all utf8 so columns can be read as strings to be parsed separately - // this is due to the fact that the csv parser cannot handle leading/trailing spaces - fn string_schema(schema: Schema) -> Schema { - Schema::new( - schema - .fields() - .iter() - .map(|field| { - Field::new( - Field::name(field), - DataType::Utf8, - Field::is_nullable(field), - ) - }) - .collect::>(), - ) - } - - // convert the schema to the same but with all columns set to nullable=true. - // this allows direct schema comparison ignoring nullable. - fn nullable_schema(schema: Arc) -> Schema { - Schema::new( - schema - .fields() - .iter() - .map(|field| { - Field::new( - Field::name(field), - Field::data_type(field).to_owned(), - true, - ) - }) - .collect::>(), - ) - } - async fn run_query(n: usize) -> Result<()> { // Tests running query with empty tables, to see whether they run successfully. @@ -1453,31 +1520,7 @@ mod tests { // load expected answers from tpch-dbgen // read csv as all strings, trim and cast to expected type as the csv string // to value parser does not handle data with leading/trailing spaces - let ctx = SessionContext::new(); - let schema = string_schema(get_answer_schema(n)); - let options = CsvReadOptions::new() - .schema(&schema) - .delimiter(b'|') - .file_extension(".out"); - let df = ctx - .read_csv(&format!("{}/answers/q{}.out", path, n), options) - .await?; - let df = df.select( - get_answer_schema(n) - .fields() - .iter() - .map(|field| { - Expr::Alias( - Box::new(Expr::Cast(Cast { - expr: Box::new(trim(col(Field::name(field)))), - data_type: Field::data_type(field).to_owned(), - })), - Field::name(field).to_string(), - ) - }) - .collect::>(), - )?; - let expected = df.collect().await?; + let expected = get_expected_results(n, &path).await?; // run the query to compute actual results of the query let opt = DataFusionBenchmarkOpt { @@ -1493,23 +1536,7 @@ mod tests { }; let actual = benchmark_datafusion(opt).await?; - // assert schema equality without comparing nullable values - assert_eq!( - nullable_schema(expected[0].schema()), - nullable_schema(actual[0].schema()) - ); - - // convert both datasets to Vec> for simple comparison - let expected_vec = result_vec(&expected); - let actual_vec = result_vec(&actual); - - // basic result comparison - assert_eq!(expected_vec.len(), actual_vec.len()); - - // compare each row. this works as all TPC-H queries have determinisically ordered results - for i in 0..actual_vec.len() { - assert_eq!(expected_vec[i], actual_vec[i]); - } + assert_expected_results(&expected, &actual) } else { println!("TPCH_DATA environment variable not set, skipping test"); } diff --git a/benchmarks/tpch-gen.sh b/benchmarks/tpch-gen.sh index 28c46a812..f28bdc747 100755 --- a/benchmarks/tpch-gen.sh +++ b/benchmarks/tpch-gen.sh @@ -16,7 +16,9 @@ # specific language governing permissions and limitations # under the License. -#set -e +mkdir -p data/answers 2>/dev/null + +set -e pushd .. . ./dev/build-set-env.sh @@ -27,7 +29,13 @@ FILE=./data/supplier.tbl if test -f "$FILE"; then echo "$FILE exists." else - mkdir data 2>/dev/null docker run -v `pwd`/data:/data -it --rm ghcr.io/databloom-ai/tpch-docker:main -vf -s 1 - ls -l data +fi + +# Copy expected answers into the ./data/answers directory if it does not already exist +FILE=./data/answers/q1.out +if test -f "$FILE"; then + echo "$FILE exists." +else + docker run -v `pwd`/data:/data -it --entrypoint /bin/bash --rm ghcr.io/databloom-ai/tpch-docker:main -c "cp /opt/tpch/2.18.0_rc2/dbgen/answers/* /data/answers/" fi \ No newline at end of file diff --git a/dev/integration-tests.sh b/dev/integration-tests.sh index 9a6016e91..c08245f91 100755 --- a/dev/integration-tests.sh +++ b/dev/integration-tests.sh @@ -17,16 +17,28 @@ # specific language governing permissions and limitations # under the License. set -e -./dev/build-ballista-docker.sh + +echo "Generating benchmark data ..." pushd benchmarks ./tpch-gen.sh +popd +echo "Building Docker images ..." +./dev/build-ballista-docker.sh + +echo "Starting docker-compose in background ..." docker-compose up -d # give the scheduler a chance to start up +echo "Sleeping (wait for scheduler to start)..." sleep 10 +echo "Running benchmarks ..." docker-compose run ballista-client /root/run.sh + +#TODO need to call docker-compose down even if benchmarks fail + +echo "Stopping docker-compose ..." docker-compose down popd diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh index 00e8691e4..3e490a44f 100755 --- a/dev/release/verify-release-candidate.sh +++ b/dev/release/verify-release-candidate.sh @@ -127,6 +127,12 @@ test_source_distribution() { export ARROW_TEST_DATA=$PWD/arrow-testing-data/data export PARQUET_TEST_DATA=$PWD/parquet-testing-data/data + # TODO: enable this eventually so that cargo test will check benchmark query results +# pushd benchmarks +# ./tpch-gen.sh +# popd +# export TPCH_DATA=`pwd`/benchmarks/data + cargo build cargo test --all @@ -135,9 +141,6 @@ test_source_distribution() { exit 1 fi - # run integration tests that will run end-to-end tests in docker-compose - ./dev/integration-tests.sh - # Note can't verify other ballista crates as they depend # on ballista-core which isn't published yet pushd ballista/core