Skip to content

Commit

Permalink
feat: support MOR read-optimized query
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Jan 22, 2025
1 parent b658a98 commit 1988bea
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 34 deletions.
13 changes: 12 additions & 1 deletion crates/core/src/config/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::str::FromStr;

use strum_macros::EnumIter;

use crate::config::error::ConfigError::{NotFound, ParseInt};
use crate::config::error::ConfigError::{NotFound, ParseBool, ParseInt};
use crate::config::Result;
use crate::config::{ConfigParser, HudiConfigValue};

Expand Down Expand Up @@ -52,6 +52,10 @@ pub enum HudiReadConfig {

/// Parallelism for listing files on storage.
ListingParallelism,

/// When set to true, only [BaseFile]s will be read for optimized reads.
/// This is only applicable to Merge-On-Read (MOR) tables.
UseReadOptimizedMode,
}

impl AsRef<str> for HudiReadConfig {
Expand All @@ -60,6 +64,7 @@ impl AsRef<str> for HudiReadConfig {
Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
Self::InputPartitions => "hoodie.read.input.partitions",
Self::ListingParallelism => "hoodie.read.listing.parallelism",
Self::UseReadOptimizedMode => "hoodie.read.use.read_optimized.mode",
}
}
}
Expand All @@ -71,6 +76,7 @@ impl ConfigParser for HudiReadConfig {
match self {
HudiReadConfig::InputPartitions => Some(HudiConfigValue::UInteger(0usize)),
HudiReadConfig::ListingParallelism => Some(HudiConfigValue::UInteger(10usize)),
HudiReadConfig::UseReadOptimizedMode => Some(HudiConfigValue::Boolean(false)),
_ => None,
}
}
Expand All @@ -93,6 +99,11 @@ impl ConfigParser for HudiReadConfig {
usize::from_str(v).map_err(|e| ParseInt(self.key(), v.to_string(), e))
})
.map(HudiConfigValue::UInteger),
Self::UseReadOptimizedMode => get_result
.and_then(|v| {
bool::from_str(v).map_err(|e| ParseBool(self.key(), v.to_string(), e))

Check warning on line 104 in crates/core/src/config/read.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/config/read.rs#L104

Added line #L104 was not covered by tests
})
.map(HudiConfigValue::Boolean),
}
}
}
Expand Down
63 changes: 41 additions & 22 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,10 @@ mod fs_view;
mod listing;
pub mod partition;

use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::read::HudiReadConfig::{AsOfTimestamp, UseReadOptimizedMode};
use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::expr::filter::{Filter, FilterField};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
Expand Down Expand Up @@ -141,11 +140,25 @@ impl Table {
.await
}

pub fn base_url(&self) -> Result<Url> {
#[inline]
pub fn base_url(&self) -> Url {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::BasePath);
self.hudi_configs
.get(HudiTableConfig::BasePath)?
.get(HudiTableConfig::BasePath)
.expect(&err_msg)
.to_url()
.map_err(CoreError::from)
.expect(&err_msg)
}

#[inline]
pub fn table_type(&self) -> TableTypeValue {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType);
let table_type = self
.hudi_configs
.get(HudiTableConfig::TableType)
.expect(&err_msg)
.to::<String>();
TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
}

#[inline]
Expand Down Expand Up @@ -176,16 +189,6 @@ impl Table {
.register_object_store(runtime_env.clone());
}

pub fn get_table_type(&self) -> TableTypeValue {
let err_msg = format!("{:?} is missing or invalid.", HudiTableConfig::TableType);
let table_type = self
.hudi_configs
.get(HudiTableConfig::TableType)
.expect(&err_msg)
.to::<String>();
TableTypeValue::from_str(table_type.as_str()).expect(&err_msg)
}

/// Get the latest [Schema] of the table.
pub async fn get_schema(&self) -> Result<Schema> {
self.timeline.get_latest_schema().await
Expand Down Expand Up @@ -289,11 +292,21 @@ impl Table {
///
/// If the [AsOfTimestamp] configuration is set, the records at the specified timestamp will be returned.
pub async fn read_snapshot(&self, filters: &[Filter]) -> Result<Vec<RecordBatch>> {
let read_optimized_mode = self
.hudi_configs
.get_or_default(UseReadOptimizedMode)
.to::<bool>();

if let Some(timestamp) = self.hudi_configs.try_get(AsOfTimestamp) {
self.read_snapshot_as_of(timestamp.to::<String>().as_str(), filters)
.await
self.read_snapshot_as_of(
timestamp.to::<String>().as_str(),
filters,
read_optimized_mode,

Check warning on line 304 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L301-L304

Added lines #L301 - L304 were not covered by tests
)
.await

Check warning on line 306 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L306

Added line #L306 was not covered by tests
} else if let Some(timestamp) = self.timeline.get_latest_commit_timestamp() {
self.read_snapshot_as_of(timestamp, filters).await
self.read_snapshot_as_of(timestamp, filters, read_optimized_mode)

Check warning on line 308 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L308

Added line #L308 was not covered by tests
.await
} else {
Ok(Vec::new())
}
Expand All @@ -304,10 +317,12 @@ impl Table {
&self,
timestamp: &str,
filters: &[Filter],
read_optimized_mode: bool,
) -> Result<Vec<RecordBatch>> {
let file_slices = self.get_file_slices_as_of(timestamp, filters).await?;
let fg_reader = self.create_file_group_reader();
let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite;
let base_file_only =
read_optimized_mode || self.table_type() == TableTypeValue::CopyOnWrite;
let timezone = self.timezone();
let instant_range = InstantRange::up_to(timestamp, &timezone);
let batches = futures::future::try_join_all(
Expand Down Expand Up @@ -351,7 +366,9 @@ impl Table {
];
let fg_reader =
self.create_file_group_reader_with_filters(filters, MetaField::schema().as_ref())?;
let base_file_only = self.get_table_type() == TableTypeValue::CopyOnWrite;

// Read-optimized mode does not apply to incremental query semantics.
let base_file_only = self.table_type() == TableTypeValue::CopyOnWrite;

Check warning on line 371 in crates/core/src/table/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/core/src/table/mod.rs#L371

Added line #L371 was not covered by tests
let timezone = self.timezone();
let instant_range =
InstantRange::within_open_closed(start_timestamp, end_timestamp, &timezone);
Expand Down Expand Up @@ -416,7 +433,7 @@ mod tests {
/// Test helper to get relative file paths from the table with filters.
async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
let base_url = table.base_url()?;
let base_url = table.base_url();
for f in table.get_file_slices(filters).await? {
let relative_path = f.base_file_relative_path()?;
let file_url = join_url_segments(&base_url, &[relative_path.as_str()])?;
Expand Down Expand Up @@ -977,7 +994,9 @@ mod tests {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
let records = hudi_table.read_snapshot_as_of(first_commit, &[]).await?;
let records = hudi_table
.read_snapshot_as_of(first_commit, &[], false)
.await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;

Expand Down
13 changes: 2 additions & 11 deletions crates/datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,7 @@ impl TableProvider for HudiDataSource {
.get_file_slices_splits(self.get_input_partitions(), pushdown_filters.as_slice())
.await
.map_err(|e| Execution(format!("Failed to get file slices from Hudi table: {}", e)))?;
let base_url = self.table.base_url().map_err(|e| {
Execution(format!(
"Failed to get base path config from Hudi table: {e:?}"
))
})?;
let base_url = self.table.base_url();
let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
for file_slice_vec in file_slices {
let mut parquet_file_group_vec = Vec::new();
Expand All @@ -204,12 +200,7 @@ impl TableProvider for HudiDataSource {
parquet_file_groups.push(parquet_file_group_vec)
}

let base_url = self.table.base_url().map_err(|e| {
Execution(format!(
"Failed to get base path config from Hudi table: {}",
e
))
})?;
let base_url = self.table.base_url();
let url = ObjectStoreUrl::parse(get_scheme_authority(&base_url))?;
let fsc = FileScanConfig::new(url, self.schema())
.with_file_groups(parquet_file_groups)
Expand Down

0 comments on commit 1988bea

Please sign in to comment.