Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement write.metadata.delete-after-commit.enabled to clean up old metadata files #1607

Merged
merged 18 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Iceberg tables support table properties to configure table behavior.
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
| `write.metadata.delete-after-commit.enabled` | Boolean | False | Whether to automatically delete old *tracked* metadata files after each table commit. It will retain a number of the most recent metadata files, which can be set using property `write.metadata.previous-versions-max`. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
Expand Down
20 changes: 19 additions & 1 deletion pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
CreateTableTransaction,
StagedTable,
Table,
TableProperties,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand All @@ -72,6 +73,7 @@
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.deprecated import deprecated as deprecated
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -746,6 +748,21 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
pass
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")

@staticmethod
def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None:
"""Delete oldest metadata if config is set to true."""
delete_after_commit: bool = property_as_bool(
metadata.properties,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT,
)

if delete_after_commit:
removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log}
current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log}
removed_previous_metadata_files.difference_update(current_metadata_files)
delete_files(io, removed_previous_metadata_files, METADATA)

def __repr__(self) -> str:
"""Return the string representation of the Catalog class."""
return f"{self.name} ({self.__class__})"
Expand Down Expand Up @@ -858,6 +875,7 @@ def _update_and_stage_table(
enforce_validation=current_table is None,
metadata_location=current_table.metadata_location if current_table else None,
)
io = self._load_file_io(properties=updated_metadata.properties, location=updated_metadata.location)
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
Expand All @@ -866,7 +884,7 @@ def _update_and_stage_table(
identifier=table_identifier,
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),
io=io,
catalog=self,
)

Expand Down
8 changes: 8 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ class TableProperties:
METADATA_PREVIOUS_VERSIONS_MAX = "write.metadata.previous-versions-max"
METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT = 100

METADATA_DELETE_AFTER_COMMIT_ENABLED = "write.metadata.delete-after-commit.enabled"
METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT = False

MAX_SNAPSHOT_AGE_MS = "history.expire.max-snapshot-age-ms"
MAX_SNAPSHOT_AGE_MS_DEFAULT = 5 * 24 * 60 * 60 * 1000 # 5 days

Expand Down Expand Up @@ -1179,6 +1182,11 @@ def refs(self) -> Dict[str, SnapshotRef]:

def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog.commit_table(self, requirements, updates)

# https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527
# delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true
self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to https://github.com/apache/iceberg/blob/f6faa58dac57e03be6e02a43937ac7c15c770225/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L539-L544

can we add a comment here explaining how METADATA_PREVIOUS_VERSIONS_MAX is taken into account?

TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also maybe we want to wrap this in try/catch and throw a warning as to not block the commit process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a try-catch, but the deleteFiles already has a warning for actually deleting the files. I could only see an exception arise from _delete_old_metadata's operations.

def delete_files(io: FileIO, files_to_delete: Set[str], file_type: str) -> None:
"""Delete files.
Log warnings if failing to delete any file.
Args:
io: The FileIO used to delete the object.
files_to_delete: A set of file paths to be deleted.
file_type: The type of the file.
"""
for file in files_to_delete:
try:
io.delete(file)
except OSError as exc:
logger.warning(msg=f"Failed to delete {file_type} file {file}", exc_info=exc)


self.metadata = response.metadata
self.metadata_location = response.metadata_location

Expand Down
1 change: 1 addition & 0 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def commit_table(
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, updates)

if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)
Expand Down
54 changes: 54 additions & 0 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.sorting import (
NullOrder,
Expand Down Expand Up @@ -1613,3 +1614,56 @@ def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with
tbl.append(arrow_table_with_null)

assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null)


@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
@pytest.mark.parametrize(
"table_identifier",
[
lazy_fixture("random_table_identifier"),
lazy_fixture("random_hierarchical_identifier"),
],
)
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved
def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None:
namespace = Catalog.namespace_from(table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(table_identifier, table_schema_nested)
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved

original_metadata_location = table.metadata_location

for i in range(5):
with table.transaction() as transaction:
with transaction.update_schema() as update:
update.add_column(path=f"new_column_{i}", field_type=IntegerType())

assert len(table.metadata.metadata_log) == 5
assert os.path.exists(original_metadata_location[len("file://") :])

# Set the max versions property to 2, and delete after commit
new_property = {
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2",
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true",
}

with table.transaction() as transaction:
transaction.set_properties(properties=new_property)

# Verify that only the most recent metadata files are kept
assert len(table.metadata.metadata_log) == 2
updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log

with table.transaction() as transaction:
kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved
with transaction.update_schema() as update:
update.add_column(path="new_column_x", field_type=IntegerType())

assert len(table.metadata.metadata_log) == 2
assert not os.path.exists(original_metadata_location[len("file://") :])
assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :])
assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])