Skip to content

Commit

Permalink
feat: implement basic record merge semantics (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan authored Jan 17, 2025
1 parent 42800d9 commit cd6505e
Show file tree
Hide file tree
Showing 5 changed files with 497 additions and 0 deletions.
31 changes: 31 additions & 0 deletions crates/core/src/config/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::config::error::ConfigError::{
};
use crate::config::Result;
use crate::config::{ConfigParser, HudiConfigValue};
use crate::merge::RecordMergeStrategyValue;

/// Configurations for Hudi tables, most of them are persisted in `hoodie.properties`.
///
Expand Down Expand Up @@ -91,6 +92,9 @@ pub enum HudiTableConfig {
/// Concatenated values of these fields are used as the record key component of HoodieKey.
RecordKeyFields,

/// Strategy to merge incoming records with existing records in the table.
RecordMergeStrategy,

/// Table name that will be used for registering with Hive. Needs to be same across runs.
TableName,

Expand Down Expand Up @@ -127,6 +131,7 @@ impl AsRef<str> for HudiTableConfig {
Self::PrecombineField => "hoodie.table.precombine.field",
Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
Self::RecordKeyFields => "hoodie.table.recordkey.fields",
Self::RecordMergeStrategy => "hoodie.table.record.merge.strategy",
Self::TableName => "hoodie.table.name",
Self::TableType => "hoodie.table.type",
Self::TableVersion => "hoodie.table.version",
Expand All @@ -145,6 +150,9 @@ impl ConfigParser for HudiTableConfig {
Self::DropsPartitionFields => Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
Self::RecordMergeStrategy => Some(HudiConfigValue::String(
RecordMergeStrategyValue::default().as_ref().to_string(),
)),
Self::TimelineTimezone => Some(HudiConfigValue::String(
TimelineTimezoneValue::UTC.as_ref().to_string(),
)),
Expand Down Expand Up @@ -199,6 +207,9 @@ impl ConfigParser for HudiTableConfig {
.map(HudiConfigValue::Boolean),
Self::RecordKeyFields => get_result
.map(|v| HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
Self::RecordMergeStrategy => get_result
.and_then(RecordMergeStrategyValue::from_str)
.map(|v| HudiConfigValue::String(v.as_ref().to_string())),
Self::TableName => get_result.map(|v| HudiConfigValue::String(v.to_string())),
Self::TableType => get_result
.and_then(TableTypeValue::from_str)
Expand Down Expand Up @@ -382,4 +393,24 @@ mod tests {
InvalidValue(_)
));
}

#[test]
fn create_record_merge_strategy() {
assert_eq!(
RecordMergeStrategyValue::from_str("Append_Only").unwrap(),
RecordMergeStrategyValue::AppendOnly
);
assert_eq!(
RecordMergeStrategyValue::from_str("OVERWRITE_with_LATEST").unwrap(),
RecordMergeStrategyValue::OverwriteWithLatest
);
assert!(matches!(
RecordMergeStrategyValue::from_str("").unwrap_err(),
InvalidValue(_)
));
assert!(matches!(
RecordMergeStrategyValue::from_str("foo").unwrap_err(),
InvalidValue(_)
));
}
}
3 changes: 3 additions & 0 deletions crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum CoreError {
#[error("Commit metadata error: {0}")]
CommitMetadata(String),

#[error("{0}")]
MergeRecordError(String),

#[error("Data type error: {0}")]
Schema(String),

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod config;
pub mod error;
pub mod expr;
pub mod file_group;
pub mod merge;
pub mod storage;
pub mod table;
pub mod timeline;
Expand Down
52 changes: 52 additions & 0 deletions crates/core/src/merge/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
pub mod record_merger;

use crate::config::error;
use crate::config::error::ConfigError;
use crate::config::error::ConfigError::InvalidValue;
use std::str::FromStr;
use strum_macros::AsRefStr;

/// Config value for [crate::config::table::HudiTableConfig::RecordMergeStrategy].
#[derive(Clone, Debug, PartialEq, AsRefStr)]
pub enum RecordMergeStrategyValue {
#[strum(serialize = "append_only")]
AppendOnly,
#[strum(serialize = "overwrite_with_latest")]
OverwriteWithLatest,
}

impl Default for RecordMergeStrategyValue {
fn default() -> Self {
Self::OverwriteWithLatest
}
}

impl FromStr for RecordMergeStrategyValue {
type Err = ConfigError;

fn from_str(s: &str) -> error::Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"append_only" => Ok(Self::AppendOnly),
"overwrite_with_latest" => Ok(Self::OverwriteWithLatest),
v => Err(InvalidValue(v.to_string())),
}
}
}
Loading

0 comments on commit cd6505e

Please sign in to comment.