From d423c786195adaf7d359fedea8ba263728085494 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Tue, 7 Jan 2025 17:35:00 -0600 Subject: [PATCH] add ut --- crates/core/src/table/mod.rs | 269 +++++++++++++++++++---------------- 1 file changed, 145 insertions(+), 124 deletions(-) diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index a26c3d7..03dc2c9 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -345,7 +345,6 @@ mod tests { use crate::storage::util::join_url_segments; use crate::storage::Storage; use crate::table::Filter; - use arrow_array::{Array, StringArray}; use hudi_tests::{assert_not, TestTable}; use std::collections::HashSet; use std::fs::canonicalize; @@ -860,133 +859,155 @@ mod tests { assert_eq!(actual, expected); } - #[tokio::test] - async fn hudi_table_read_snapshot_for_complex_keygen_hive_style() { - let base_url = TestTable::V6ComplexkeygenHivestyle.url(); - let hudi_table = Table::new(base_url.path()).await.unwrap(); + mod test_snapshot_queries { + use super::super::*; + use arrow_array::{Array, StringArray}; + use hudi_tests::TestTable; + use std::collections::HashSet; - let filter_gte_10 = Filter::try_from(("byteField", ">=", "10")).unwrap(); - let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap(); - let filter_ne_100 = Filter::try_from(("shortField", "!=", "100")).unwrap(); + #[tokio::test] + async fn test_empty() -> Result<()> { + let base_url = TestTable::V6Empty.url(); + let hudi_table = Table::new(base_url.path()).await?; - let records = hudi_table - .read_snapshot(&[filter_gte_10, filter_lt_20, filter_ne_100]) - .await - .unwrap(); - assert_eq!(records.len(), 1); - assert_eq!(records[0].num_rows(), 2); - let actual_partition_paths: HashSet<&str> = HashSet::from_iter( - records[0] - .column_by_name("_hoodie_partition_path") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|s| s.unwrap()) - .collect::>(), - ); - let expected_partition_paths: HashSet<&str> = - HashSet::from_iter(vec!["byteField=10/shortField=300"]); - assert_eq!(actual_partition_paths, expected_partition_paths); + let records = hudi_table.read_snapshot(&[]).await?; + assert!(records.is_empty()); - let actual_file_names: HashSet<&str> = HashSet::from_iter( - records[0] - .column_by_name("_hoodie_file_name") - .unwrap() - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|s| s.unwrap()) - .collect::>(), - ); - let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![ - "a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", - ]); - assert_eq!(actual_file_names, expected_file_names); + Ok(()) + } + + #[tokio::test] + async fn test_complex_keygen_hive_style() { + let base_url = TestTable::V6ComplexkeygenHivestyle.url(); + let hudi_table = Table::new(base_url.path()).await.unwrap(); + + let filter_gte_10 = Filter::try_from(("byteField", ">=", "10")).unwrap(); + let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap(); + let filter_ne_100 = Filter::try_from(("shortField", "!=", "100")).unwrap(); + + let records = hudi_table + .read_snapshot(&[filter_gte_10, filter_lt_20, filter_ne_100]) + .await + .unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].num_rows(), 2); + + let partition_paths = StringArray::from( + records[0] + .column_by_name("_hoodie_partition_path") + .unwrap() + .to_data(), + ); + let actual_partition_paths = + HashSet::<&str>::from_iter(partition_paths.iter().map(|s| s.unwrap())); + let expected_partition_paths = HashSet::from_iter(vec!["byteField=10/shortField=300"]); + assert_eq!(actual_partition_paths, expected_partition_paths); + + let file_names = StringArray::from( + records[0] + .column_by_name("_hoodie_file_name") + .unwrap() + .to_data(), + ); + let actual_file_names = + HashSet::<&str>::from_iter(file_names.iter().map(|s| s.unwrap())); + let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![ + "a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet", + ]); + assert_eq!(actual_file_names, expected_file_names); + } } - #[tokio::test] - async fn hudi_table_read_incremental_records_for_simplekeygen_nonhivestyle_overwritetable( - ) -> Result<()> { - let base_url = TestTable::V6SimplekeygenNonhivestyleOverwritetable.url(); - let hudi_table = Table::new(base_url.path()).await?; - - // read records changed from the first commit (exclusive) to the second commit (inclusive) - let records = hudi_table - .read_incremental_records("20240707001301554", Some("20240707001302376")) - .await?; - assert_eq!(records.len(), 2); - assert_eq!(records[0].num_rows(), 1); - assert_eq!(records[1].num_rows(), 1); - - // verify the partition paths - let partition_paths = arrow::compute::concat(&[ - records[0].column_by_name("_hoodie_partition_path").unwrap(), - records[1].column_by_name("_hoodie_partition_path").unwrap(), - ])? - .to_data(); - let partition_paths = StringArray::from(partition_paths); - let actual_partition_paths: HashSet<&str> = HashSet::from_iter( - partition_paths - .iter() - .map(|s| s.unwrap()) - .collect::>(), - ); - let expected_partition_paths = HashSet::from_iter(vec!["10", "30"]); - assert_eq!(actual_partition_paths, expected_partition_paths); - - // verify the file names - let file_names = arrow::compute::concat(&[ - records[0].column_by_name("_hoodie_file_name").unwrap(), - records[1].column_by_name("_hoodie_file_name").unwrap(), - ])? - .to_data(); - let file_names = StringArray::from(file_names); - let actual_file_names: HashSet<&str> = - HashSet::from_iter(file_names.iter().map(|s| s.unwrap()).collect::>()); - let expected_file_names = HashSet::from_iter(vec![ - "d398fae1-c0e6-4098-8124-f55f7098bdba-0_1-95-136_20240707001302376.parquet", - "4f2685a3-614f-49ca-9b2b-e1cb9fb61f27-0_0-95-135_20240707001302376.parquet", - ]); - assert_eq!(actual_file_names, expected_file_names); - - // read records changed from the first commit (exclusive) to - // the latest (an insert overwrite table's replacecommit) - let records = hudi_table - .read_incremental_records("20240707001301554", None) - .await?; - assert_eq!(records.len(), 1); - assert_eq!(records[0].num_rows(), 1); - - // verify the partition paths - let partition_paths = arrow::compute::concat(&[records[0] - .column_by_name("_hoodie_partition_path") - .unwrap()])? - .to_data(); - let partition_paths = StringArray::from(partition_paths); - let actual_partition_paths: HashSet<&str> = HashSet::from_iter( - partition_paths - .iter() - .map(|s| s.unwrap()) - .collect::>(), - ); - let expected_partition_paths = HashSet::from_iter(vec!["30"]); - assert_eq!(actual_partition_paths, expected_partition_paths); - - // verify the file names - let file_names = - arrow::compute::concat(&[records[0].column_by_name("_hoodie_file_name").unwrap()])? - .to_data(); - let file_names = StringArray::from(file_names); - let actual_file_names: HashSet<&str> = - HashSet::from_iter(file_names.iter().map(|s| s.unwrap()).collect::>()); - let expected_file_names = HashSet::from_iter(vec![ - "ebcb261d-62d3-4895-90ec-5b3c9622dff4-0_0-111-154_20240707001303088.parquet", - ]); - assert_eq!(actual_file_names, expected_file_names); - - Ok(()) + mod test_incremental_queries { + use super::super::*; + use arrow_array::{Array, StringArray}; + use hudi_tests::TestTable; + use std::collections::HashSet; + + #[tokio::test] + async fn test_empty() -> Result<()> { + let base_url = TestTable::V6Empty.url(); + let hudi_table = Table::new(base_url.path()).await?; + + let records = hudi_table.read_incremental_records("0", None).await?; + assert!(records.is_empty()); + + Ok(()) + } + + #[tokio::test] + async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> { + let base_url = TestTable::V6SimplekeygenNonhivestyleOverwritetable.url(); + let hudi_table = Table::new(base_url.path()).await?; + + // read records changed from the first commit (exclusive) to the second commit (inclusive) + let records = hudi_table + .read_incremental_records("20240707001301554", Some("20240707001302376")) + .await?; + assert_eq!(records.len(), 2); + assert_eq!(records[0].num_rows(), 1); + assert_eq!(records[1].num_rows(), 1); + + // verify the partition paths + let partition_paths = StringArray::from( + arrow::compute::concat(&[ + records[0].column_by_name("_hoodie_partition_path").unwrap(), + records[1].column_by_name("_hoodie_partition_path").unwrap(), + ])? + .to_data(), + ); + let actual_partition_paths = + HashSet::<&str>::from_iter(partition_paths.iter().map(|s| s.unwrap())); + let expected_partition_paths = HashSet::from_iter(vec!["10", "30"]); + assert_eq!(actual_partition_paths, expected_partition_paths); + + // verify the file names + let file_names = StringArray::from( + arrow::compute::concat(&[ + records[0].column_by_name("_hoodie_file_name").unwrap(), + records[1].column_by_name("_hoodie_file_name").unwrap(), + ])? + .to_data(), + ); + let actual_file_names = + HashSet::<&str>::from_iter(file_names.iter().map(|s| s.unwrap())); + let expected_file_names = HashSet::from_iter(vec![ + "d398fae1-c0e6-4098-8124-f55f7098bdba-0_1-95-136_20240707001302376.parquet", + "4f2685a3-614f-49ca-9b2b-e1cb9fb61f27-0_0-95-135_20240707001302376.parquet", + ]); + assert_eq!(actual_file_names, expected_file_names); + + // read records changed from the first commit (exclusive) to + // the latest (an insert overwrite table's replacecommit) + let records = hudi_table + .read_incremental_records("20240707001301554", None) + .await?; + assert_eq!(records.len(), 1); + assert_eq!(records[0].num_rows(), 1); + + // verify the partition paths + let actual_partition_paths = StringArray::from( + records[0] + .column_by_name("_hoodie_partition_path") + .unwrap() + .to_data(), + ); + let expected_partition_paths = StringArray::from(vec!["30"]); + assert_eq!(actual_partition_paths, expected_partition_paths); + + // verify the file names + let actual_file_names = StringArray::from( + records[0] + .column_by_name("_hoodie_file_name") + .unwrap() + .to_data(), + ); + let expected_file_names = StringArray::from(vec![ + "ebcb261d-62d3-4895-90ec-5b3c9622dff4-0_0-111-154_20240707001303088.parquet", + ]); + assert_eq!(actual_file_names, expected_file_names); + + Ok(()) + } } }