From 1988bead13e97adee7d931b84147bffe059c2c84 Mon Sep 17 00:00:00 2001 From: Shiyan Xu <2701446+xushiyan@users.noreply.github.com> Date: Tue, 21 Jan 2025 22:16:22 -0600 Subject: [PATCH] feat: support MOR read-optimized query --- crates/core/src/config/read.rs | 13 ++++++- crates/core/src/table/mod.rs | 63 ++++++++++++++++++++++------------ crates/datafusion/src/lib.rs | 13 ++----- 3 files changed, 55 insertions(+), 34 deletions(-) diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs index 4f131c26..21fca179 100644 --- a/crates/core/src/config/read.rs +++ b/crates/core/src/config/read.rs @@ -23,7 +23,7 @@ use std::str::FromStr; use strum_macros::EnumIter; -use crate::config::error::ConfigError::{NotFound, ParseInt}; +use crate::config::error::ConfigError::{NotFound, ParseBool, ParseInt}; use crate::config::Result; use crate::config::{ConfigParser, HudiConfigValue}; @@ -52,6 +52,10 @@ pub enum HudiReadConfig { /// Parallelism for listing files on storage. ListingParallelism, + + /// When set to true, only [BaseFile]s will be read for optimized reads. + /// This is only applicable to Merge-On-Read (MOR) tables. + UseReadOptimizedMode, } impl AsRef for HudiReadConfig { @@ -60,6 +64,7 @@ impl AsRef for HudiReadConfig { Self::AsOfTimestamp => "hoodie.read.as.of.timestamp", Self::InputPartitions => "hoodie.read.input.partitions", Self::ListingParallelism => "hoodie.read.listing.parallelism", + Self::UseReadOptimizedMode => "hoodie.read.use.read_optimized.mode", } } } @@ -71,6 +76,7 @@ impl ConfigParser for HudiReadConfig { match self { HudiReadConfig::InputPartitions => Some(HudiConfigValue::UInteger(0usize)), HudiReadConfig::ListingParallelism => Some(HudiConfigValue::UInteger(10usize)), + HudiReadConfig::UseReadOptimizedMode => Some(HudiConfigValue::Boolean(false)), _ => None, } } @@ -93,6 +99,11 @@ impl ConfigParser for HudiReadConfig { usize::from_str(v).map_err(|e| ParseInt(self.key(), v.to_string(), e)) }) .map(HudiConfigValue::UInteger), + Self::UseReadOptimizedMode => get_result + .and_then(|v| { + bool::from_str(v).map_err(|e| ParseBool(self.key(), v.to_string(), e)) + }) + .map(HudiConfigValue::Boolean), } } } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 7f5a82c2..3f1049d2 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -90,11 +90,10 @@ mod fs_view; mod listing; pub mod partition; -use crate::config::read::HudiReadConfig::AsOfTimestamp; +use crate::config::read::HudiReadConfig::{AsOfTimestamp, UseReadOptimizedMode}; use crate::config::table::HudiTableConfig::PartitionFields; use crate::config::table::{HudiTableConfig, TableTypeValue}; use crate::config::HudiConfigs; -use crate::error::CoreError; use crate::expr::filter::{Filter, FilterField}; use crate::file_group::file_slice::FileSlice; use crate::file_group::reader::FileGroupReader; @@ -141,11 +140,25 @@ impl Table { .await } - pub fn base_url(&self) -> Result { + #[inline] + pub fn base_url(&self) -> Url { + let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath); self.hudi_configs - .get(HudiTableConfig::BasePath)? + .get(HudiTableConfig::BasePath) + .expect(&err_msg) .to_url() - .map_err(CoreError::from) + .expect(&err_msg) + } + + #[inline] + pub fn table_type(&self) -> TableTypeValue { + let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType); + let table_type = self + .hudi_configs + .get(HudiTableConfig::TableType) + .expect(&err_msg) + .to::(); + TableTypeValue::from_str(table_type.as_str()).expect(&err_msg) } #[inline] @@ -176,16 +189,6 @@ impl Table { .register_object_store(runtime_env.clone()); } - pub fn get_table_type(&self) -> TableTypeValue { - let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType); - let table_type = self - .hudi_configs - .get(HudiTableConfig::TableType) - .expect(&err_msg) - .to::(); - TableTypeValue::from_str(table_type.as_str()).expect(&err_msg) - } - /// Get the latest [Schema] of the table. pub async fn get_schema(&self) -> Result { self.timeline.get_latest_schema().await @@ -289,11 +292,21 @@ impl Table { /// /// If the [AsOfTimestamp] configuration is set, the records at the specified timestamp will be returned. pub async fn read_snapshot(&self, filters: &[Filter]) -> Result> { + let read_optimized_mode = self + .hudi_configs + .get_or_default(UseReadOptimizedMode) + .to::(); + if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) { - self.read_snapshot_as_of(timestamp.to::().as_str(), filters) - .await + self.read_snapshot_as_of( + timestamp.to::().as_str(), + filters, + read_optimized_mode, + ) + .await } else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() { - self.read_snapshot_as_of(timestamp, filters).await + self.read_snapshot_as_of(timestamp, filters, read_optimized_mode) + .await } else { Ok(Vec::new()) } @@ -304,10 +317,12 @@ impl Table { &self, timestamp: &str, filters: &[Filter], + read_optimized_mode: bool, ) -> Result> { let file_slices = self.get_file_slices_as_of(timestamp, filters).await?; let fg_reader = self.create_file_group_reader(); - let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite; + let base_file_only = + read_optimized_mode || self.table_type() == TableTypeValue::CopyOnWrite; let timezone = self.timezone(); let instant_range = InstantRange::up_to(timestamp, &timezone); let batches = futures::future::try_join_all( @@ -351,7 +366,9 @@ impl Table { ]; let fg_reader = self.create_file_group_reader_with_filters(filters, MetaField::schema().as_ref())?; - let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite; + + // Read-optimized mode does not apply to incremental query semantics. + let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite; let timezone = self.timezone(); let instant_range = InstantRange::within_open_closed(start_timestamp, end_timestamp, &timezone); @@ -416,7 +433,7 @@ mod tests { /// Test helper to get relative file paths from the table with filters. async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> Result> { let mut file_paths = Vec::new(); - let base_url = table.base_url()?; + let base_url = table.base_url(); for f in table.get_file_slices(filters).await? { let relative_path = f.base_file_relative_path()?; let file_url = join_url_segments(&base_url, &[relative_path.as_str()])?; @@ -977,7 +994,9 @@ mod tests { .map(|i| i.timestamp.as_str()) .collect::>(); let first_commit = commit_timestamps[0]; - let records = hudi_table.read_snapshot_as_of(first_commit, &[]).await?; + let records = hudi_table + .read_snapshot_as_of(first_commit, &[], false) + .await?; let schema = &records[0].schema(); let records = concat_batches(schema, &records)?; diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs index 2c14e569..5ffbef6c 100644 --- a/crates/datafusion/src/lib.rs +++ b/crates/datafusion/src/lib.rs @@ -181,11 +181,7 @@ impl TableProvider for HudiDataSource { .get_file_slices_splits(self.get_input_partitions(), pushdown_filters.as_slice()) .await .map_err(|e| Execution(format!("Failed to get file slices from Hudi table: {}", e)))?; - let base_url = self.table.base_url().map_err(|e| { - Execution(format!( - "Failed to get base path config from Hudi table: {e:?}" - )) - })?; + let base_url = self.table.base_url(); let mut parquet_file_groups: Vec> = Vec::new(); for file_slice_vec in file_slices { let mut parquet_file_group_vec = Vec::new(); @@ -204,12 +200,7 @@ impl TableProvider for HudiDataSource { parquet_file_groups.push(parquet_file_group_vec) } - let base_url = self.table.base_url().map_err(|e| { - Execution(format!( - "Failed to get base path config from Hudi table: {}", - e - )) - })?; + let base_url = self.table.base_url(); let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?; let fsc = FileScanConfig::new(url, self.schema()) .with_file_groups(parquet_file_groups)