Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jan 7, 2025
1 parent f8b6d2d commit d423c78
Showing 1 changed file with 145 additions and 124 deletions.
269 changes: 145 additions & 124 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ mod tests {
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
use crate::table::Filter;
use arrow_array::{Array, StringArray};
use hudi_tests::{assert_not, TestTable};
use std::collections::HashSet;
use std::fs::canonicalize;
Expand Down Expand Up @@ -860,133 +859,155 @@ mod tests {
assert_eq!(actual, expected);
}

#[tokio::test]
async fn hudi_table_read_snapshot_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
mod test_snapshot_queries {
use super::super::*;
use arrow_array::{Array, StringArray};
use hudi_tests::TestTable;
use std::collections::HashSet;

let filter_gte_10 = Filter::try_from(("byteField", ">=", "10")).unwrap();
let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
let filter_ne_100 = Filter::try_from(("shortField", "!=", "100")).unwrap();
#[tokio::test]
async fn test_empty() -> Result<()> {
let base_url = TestTable::V6Empty.url();
let hudi_table = Table::new(base_url.path()).await?;

let records = hudi_table
.read_snapshot(&[filter_gte_10, filter_lt_20, filter_ne_100])
.await
.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 2);
let actual_partition_paths: HashSet<&str> = HashSet::from_iter(
records[0]
.column_by_name("_hoodie_partition_path")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.iter()
.map(|s| s.unwrap())
.collect::<Vec<_>>(),
);
let expected_partition_paths: HashSet<&str> =
HashSet::from_iter(vec!["byteField=10/shortField=300"]);
assert_eq!(actual_partition_paths, expected_partition_paths);
let records = hudi_table.read_snapshot(&[]).await?;
assert!(records.is_empty());

let actual_file_names: HashSet<&str> = HashSet::from_iter(
records[0]
.column_by_name("_hoodie_file_name")
.unwrap()
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.iter()
.map(|s| s.unwrap())
.collect::<Vec<_>>(),
);
let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![
"a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);
Ok(())
}

#[tokio::test]
async fn test_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();

let filter_gte_10 = Filter::try_from(("byteField", ">=", "10")).unwrap();
let filter_lt_20 = Filter::try_from(("byteField", "<", "20")).unwrap();
let filter_ne_100 = Filter::try_from(("shortField", "!=", "100")).unwrap();

let records = hudi_table
.read_snapshot(&[filter_gte_10, filter_lt_20, filter_ne_100])
.await
.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 2);

let partition_paths = StringArray::from(
records[0]
.column_by_name("_hoodie_partition_path")
.unwrap()
.to_data(),
);
let actual_partition_paths =
HashSet::<&str>::from_iter(partition_paths.iter().map(|s| s.unwrap()));
let expected_partition_paths = HashSet::from_iter(vec!["byteField=10/shortField=300"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

let file_names = StringArray::from(
records[0]
.column_by_name("_hoodie_file_name")
.unwrap()
.to_data(),
);
let actual_file_names =
HashSet::<&str>::from_iter(file_names.iter().map(|s| s.unwrap()));
let expected_file_names: HashSet<&str> = HashSet::from_iter(vec![
"a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);
}
}

#[tokio::test]
async fn hudi_table_read_incremental_records_for_simplekeygen_nonhivestyle_overwritetable(
) -> Result<()> {
let base_url = TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
let hudi_table = Table::new(base_url.path()).await?;

// read records changed from the first commit (exclusive) to the second commit (inclusive)
let records = hudi_table
.read_incremental_records("20240707001301554", Some("20240707001302376"))
.await?;
assert_eq!(records.len(), 2);
assert_eq!(records[0].num_rows(), 1);
assert_eq!(records[1].num_rows(), 1);

// verify the partition paths
let partition_paths = arrow::compute::concat(&[
records[0].column_by_name("_hoodie_partition_path").unwrap(),
records[1].column_by_name("_hoodie_partition_path").unwrap(),
])?
.to_data();
let partition_paths = StringArray::from(partition_paths);
let actual_partition_paths: HashSet<&str> = HashSet::from_iter(
partition_paths
.iter()
.map(|s| s.unwrap())
.collect::<Vec<_>>(),
);
let expected_partition_paths = HashSet::from_iter(vec!["10", "30"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

// verify the file names
let file_names = arrow::compute::concat(&[
records[0].column_by_name("_hoodie_file_name").unwrap(),
records[1].column_by_name("_hoodie_file_name").unwrap(),
])?
.to_data();
let file_names = StringArray::from(file_names);
let actual_file_names: HashSet<&str> =
HashSet::from_iter(file_names.iter().map(|s| s.unwrap()).collect::<Vec<_>>());
let expected_file_names = HashSet::from_iter(vec![
"d398fae1-c0e6-4098-8124-f55f7098bdba-0_1-95-136_20240707001302376.parquet",
"4f2685a3-614f-49ca-9b2b-e1cb9fb61f27-0_0-95-135_20240707001302376.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);

// read records changed from the first commit (exclusive) to
// the latest (an insert overwrite table's replacecommit)
let records = hudi_table
.read_incremental_records("20240707001301554", None)
.await?;
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 1);

// verify the partition paths
let partition_paths = arrow::compute::concat(&[records[0]
.column_by_name("_hoodie_partition_path")
.unwrap()])?
.to_data();
let partition_paths = StringArray::from(partition_paths);
let actual_partition_paths: HashSet<&str> = HashSet::from_iter(
partition_paths
.iter()
.map(|s| s.unwrap())
.collect::<Vec<_>>(),
);
let expected_partition_paths = HashSet::from_iter(vec!["30"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

// verify the file names
let file_names =
arrow::compute::concat(&[records[0].column_by_name("_hoodie_file_name").unwrap()])?
.to_data();
let file_names = StringArray::from(file_names);
let actual_file_names: HashSet<&str> =
HashSet::from_iter(file_names.iter().map(|s| s.unwrap()).collect::<Vec<_>>());
let expected_file_names = HashSet::from_iter(vec![
"ebcb261d-62d3-4895-90ec-5b3c9622dff4-0_0-111-154_20240707001303088.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);

Ok(())
mod test_incremental_queries {
use super::super::*;
use arrow_array::{Array, StringArray};
use hudi_tests::TestTable;
use std::collections::HashSet;

#[tokio::test]
async fn test_empty() -> Result<()> {
let base_url = TestTable::V6Empty.url();
let hudi_table = Table::new(base_url.path()).await?;

let records = hudi_table.read_incremental_records("0", None).await?;
assert!(records.is_empty());

Ok(())
}

#[tokio::test]
async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> {
let base_url = TestTable::V6SimplekeygenNonhivestyleOverwritetable.url();
let hudi_table = Table::new(base_url.path()).await?;

// read records changed from the first commit (exclusive) to the second commit (inclusive)
let records = hudi_table
.read_incremental_records("20240707001301554", Some("20240707001302376"))
.await?;
assert_eq!(records.len(), 2);
assert_eq!(records[0].num_rows(), 1);
assert_eq!(records[1].num_rows(), 1);

// verify the partition paths
let partition_paths = StringArray::from(
arrow::compute::concat(&[
records[0].column_by_name("_hoodie_partition_path").unwrap(),
records[1].column_by_name("_hoodie_partition_path").unwrap(),
])?
.to_data(),
);
let actual_partition_paths =
HashSet::<&str>::from_iter(partition_paths.iter().map(|s| s.unwrap()));
let expected_partition_paths = HashSet::from_iter(vec!["10", "30"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

// verify the file names
let file_names = StringArray::from(
arrow::compute::concat(&[
records[0].column_by_name("_hoodie_file_name").unwrap(),
records[1].column_by_name("_hoodie_file_name").unwrap(),
])?
.to_data(),
);
let actual_file_names =
HashSet::<&str>::from_iter(file_names.iter().map(|s| s.unwrap()));
let expected_file_names = HashSet::from_iter(vec![
"d398fae1-c0e6-4098-8124-f55f7098bdba-0_1-95-136_20240707001302376.parquet",
"4f2685a3-614f-49ca-9b2b-e1cb9fb61f27-0_0-95-135_20240707001302376.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);

// read records changed from the first commit (exclusive) to
// the latest (an insert overwrite table's replacecommit)
let records = hudi_table
.read_incremental_records("20240707001301554", None)
.await?;
assert_eq!(records.len(), 1);
assert_eq!(records[0].num_rows(), 1);

// verify the partition paths
let actual_partition_paths = StringArray::from(
records[0]
.column_by_name("_hoodie_partition_path")
.unwrap()
.to_data(),
);
let expected_partition_paths = StringArray::from(vec!["30"]);
assert_eq!(actual_partition_paths, expected_partition_paths);

// verify the file names
let actual_file_names = StringArray::from(
records[0]
.column_by_name("_hoodie_file_name")
.unwrap()
.to_data(),
);
let expected_file_names = StringArray::from(vec![
"ebcb261d-62d3-4895-90ec-5b3c9622dff4-0_0-111-154_20240707001303088.parquet",
]);
assert_eq!(actual_file_names, expected_file_names);

Ok(())
}
}
}

0 comments on commit d423c78

Please sign in to comment.