Skip to content

Commit

Permalink
Allow immaterial schema differences
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeiPatiakin committed Dec 17, 2024
1 parent 22ab168 commit 74bf3ba
Showing 1 changed file with 276 additions and 34 deletions.
310 changes: 276 additions & 34 deletions src/context/iceberg.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::str;
use itertools::izip;
use std::collections::HashMap;
use std::error::Error;
use std::pin::Pin;
Expand Down Expand Up @@ -68,7 +69,7 @@ fn create_empty_metadata(
}

// Clone an arrow schema, assigning sequential field IDs starting from 1
fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
fn assign_field_ids(arrow_schema: &Arc<Schema>) -> Schema {
let mut field_id_counter = 1;
let new_fields: Vec<Field> = arrow_schema
.fields
Expand All @@ -88,6 +89,36 @@ fn assign_field_ids(arrow_schema: Arc<Schema>) -> Schema {
Schema::new_with_metadata(new_fields, arrow_schema.metadata.clone())
}

fn is_schema_aligned(
new_arrow_schema: &Arc<Schema>,
existing_iceberg_schema: &Arc<iceberg::spec::Schema>,
) -> Result<(), DataLoadingError> {
let old_iceberg_struct = existing_iceberg_schema.as_struct();
let old_iceberg_fields = old_iceberg_struct.fields();

let new_arrow_schema_with_field_ids = assign_field_ids(new_arrow_schema);
let new_iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&new_arrow_schema_with_field_ids,
)?);
let new_iceberg_struct = new_iceberg_schema.as_struct();
let new_iceberg_fields = new_iceberg_struct.fields();

if old_iceberg_fields.len() != new_iceberg_fields.len() {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Old schema has {} fields but new schema has {} fields", old_iceberg_fields.len(), new_iceberg_fields.len())));
}
for (i, old_iceberg_field, new_iceberg_field) in
izip!(0.., old_iceberg_fields.iter(), new_iceberg_fields.iter())
{
if old_iceberg_field.required && !new_iceberg_field.required {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) is required in old schema but not required in new schema", i, old_iceberg_field.name)));
}
if old_iceberg_field.field_type != new_iceberg_field.field_type {
return Err(DataLoadingError::BadInputError(format!("New data is incompatible with existing schema. Field {} ({}) has data type {:?} in old schema but {:?} in new schema", i, old_iceberg_field.name, old_iceberg_field.field_type, new_iceberg_field.field_type)));
}
}
Ok(())
}

// Create a new TableMetadata object by updating the current snapshot of an existing TableMetadata
fn update_metadata_snapshot(
previous_metadata: &TableMetadata,
Expand Down Expand Up @@ -134,10 +165,6 @@ pub async fn record_batches_to_iceberg(
let table_base_url = Url::parse(table_location).unwrap();

let file_io = table.file_io();
let arrow_schema_with_ids = assign_field_ids(arrow_schema.clone());
let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&arrow_schema_with_ids,
)?);

let version_hint_location = format!("{}/metadata/version-hint.text", table_base_url);
let version_hint_input = file_io.new_input(&version_hint_location)?;
Expand All @@ -161,42 +188,49 @@ pub async fn record_batches_to_iceberg(
} else {
None
};
let (previous_metadata, previous_metadata_location) = match old_version_hint {
Some(version_hint) => {
let old_metadata_location = format!(
"{}/metadata/v{}.metadata.json",
table_base_url, version_hint
);
let old_metadata_bytes =
file_io.new_input(&old_metadata_location)?.read().await?;
let old_metadata_string =
str::from_utf8(&old_metadata_bytes).map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse UTF-8 in old metadata file",
))
})?;
let old_metadata = serde_json::from_str::<TableMetadata>(old_metadata_string)
let (previous_metadata, previous_metadata_location, iceberg_schema) =
match old_version_hint {
Some(version_hint) => {
let old_metadata_location = format!(
"{}/metadata/v{}.metadata.json",
table_base_url, version_hint
);
let old_metadata_bytes =
file_io.new_input(&old_metadata_location)?.read().await?;
let old_metadata_string =
str::from_utf8(&old_metadata_bytes).map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse UTF-8 in old metadata file",
))
})?;
let old_metadata = serde_json::from_str::<TableMetadata>(
old_metadata_string,
)
.map_err(|_| {
DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::DataInvalid,
"Could not parse old metadata file",
))
})?;
if old_metadata.current_schema() != &iceberg_schema {
return Err(DataLoadingError::IcebergError(iceberg::Error::new(
iceberg::ErrorKind::FeatureUnsupported,
"Schema changes not supported",
)));
let old_iceberg_schema = old_metadata.current_schema();
is_schema_aligned(&arrow_schema, old_iceberg_schema)?;
(
old_metadata.clone(),
Some(old_metadata_location),
old_iceberg_schema.clone(),
)
}
(old_metadata, Some(old_metadata_location))
}
None => {
let empty_metadata =
create_empty_metadata(&iceberg_schema, table_base_url.to_string())?;
(empty_metadata, None)
}
};
None => {
let arrow_schema_with_ids = assign_field_ids(&arrow_schema);
let iceberg_schema = Arc::new(iceberg::arrow::arrow_schema_to_schema(
&arrow_schema_with_ids,
)?);
let empty_metadata =
create_empty_metadata(&iceberg_schema, table_base_url.to_string())?;
(empty_metadata, None, iceberg_schema)
}
};

let file_writer_builder = ParquetWriterBuilder::new(
WriterProperties::builder().build(),
Expand Down Expand Up @@ -383,3 +417,211 @@ impl SeafowlContext {
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};

use arrow_schema::{DataType, Field, Schema};
use iceberg::spec::{NestedField, PrimitiveType, Type};

use super::is_schema_aligned;

#[test]
fn test_is_schema_aligned_positive() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()
);
}

#[test]
fn test_is_schema_aligned_positive_renamed() {
let arrow_schema = Schema::new_with_metadata(
vec![
// Fields renamed
Field::new("x", DataType::Utf8, false),
Field::new("y", DataType::Int32, false),
Field::new("z", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()
);
}

// OK to insert a non-nullable value into a nullable field
#[test]
fn test_is_schema_aligned_positive_nonnullable() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::optional(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::optional(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema)).is_ok()
);
}

#[test]
fn test_is_schema_aligned_negative_added_field() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::Boolean, false), // Added field
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema))
.is_err()
);
}

#[test]
fn test_is_schema_aligned_negative_different_type() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false), // Mismatched type
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema))
.is_err()
);
}

#[test]
fn test_is_schema_aligned_negative_reordered() {
let arrow_schema = Schema::new_with_metadata(
vec![
// Same fields but in wrong order
Field::new("b", DataType::Int32, false),
Field::new("a", DataType::Utf8, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema))
.is_err()
);
}

// Not allowed to insert a nullable value into a non-nullable field
#[test]
fn test_is_schema_aligned_negative_nullable() {
let arrow_schema = Schema::new_with_metadata(
vec![
Field::new("a", DataType::Utf8, true), // Nullable
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Boolean, false),
],
HashMap::new(),
);

let iceberg_schema = iceberg::spec::Schema::builder()
.with_fields(vec![
NestedField::required(1, "a", Type::Primitive(PrimitiveType::String))
.into(),
NestedField::required(2, "b", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(3, "c", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

assert!(
is_schema_aligned(&Arc::new(arrow_schema), &Arc::new(iceberg_schema))
.is_err()
);
}
}

0 comments on commit 74bf3ba

Please sign in to comment.