From d6dce6d9d767d222a04d0a7ed932989ec7aa06aa Mon Sep 17 00:00:00 2001 From: Kaushik Srinivasan Date: Tue, 11 Feb 2025 23:38:19 -0500 Subject: [PATCH] Clean up old metadata (#1607) Implements property `write.metadata.delete-after-commit.enabled` from https://iceberg.apache.org/docs/1.5.1/maintenance/#remove-old-metadata-files. Closes #1199 --------- Co-authored-by: Kevin Liu --- mkdocs/docs/configuration.md | 1 + pyiceberg/catalog/__init__.py | 17 +++++++++++++ pyiceberg/table/__init__.py | 13 ++++++++++ tests/catalog/test_sql.py | 48 +++++++++++++++++++++++++++++++++++ 4 files changed, 79 insertions(+) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index bae8ca997f..091eebc4af 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -63,6 +63,7 @@ Iceberg tables support table properties to configure table behavior. | `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk | | `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | | `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. | +| `write.metadata.delete-after-commit.enabled` | Boolean | False | Whether to automatically delete old *tracked* metadata files after each table commit. It will retain a number of the most recent metadata files, which can be set using property `write.metadata.previous-versions-max`. | | `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation | | `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled | | `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation | diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 18aea91940..a39f6bc711 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -55,6 +55,7 @@ CreateTableTransaction, StagedTable, Table, + TableProperties, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -72,6 +73,7 @@ from pyiceberg.utils.config import Config, merge_config from pyiceberg.utils.deprecated import deprecated as deprecated from pyiceberg.utils.deprecated import deprecation_message +from pyiceberg.utils.properties import property_as_bool if TYPE_CHECKING: import pyarrow as pa @@ -757,6 +759,21 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: pass raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema") + @staticmethod + def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None: + """Delete oldest metadata if config is set to true.""" + delete_after_commit: bool = property_as_bool( + metadata.properties, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT, + ) + + if delete_after_commit: + removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log} + current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log} + removed_previous_metadata_files.difference_update(current_metadata_files) + delete_files(io, removed_previous_metadata_files, METADATA) + def __repr__(self) -> str: """Return the string representation of the Catalog class.""" return f"{self.name} ({self.__class__})" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8186d6573c..1d40045779 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -221,6 +221,9 @@ class TableProperties: METADATA_PREVIOUS_VERSIONS_MAX = "write.metadata.previous-versions-max" METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT = 100 + METADATA_DELETE_AFTER_COMMIT_ENABLED = "write.metadata.delete-after-commit.enabled" + METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT = False + MAX_SNAPSHOT_AGE_MS = "history.expire.max-snapshot-age-ms" MAX_SNAPSHOT_AGE_MS_DEFAULT = 5 * 24 * 60 * 60 * 1000 # 5 days @@ -1181,6 +1184,16 @@ def refs(self) -> Dict[str, SnapshotRef]: def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: response = self.catalog.commit_table(self, requirements, updates) + + # https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527 + # delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true and uses + # TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many previous versions to keep - + # everything else will be removed. + try: + self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata) + except Exception as e: + warnings.warn(f"Failed to delete old metadata after commit: {e}") + self.metadata = response.metadata self.metadata_location = response.metadata_location diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index cffc14d9d7..d2800363a6 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -50,6 +50,7 @@ from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema +from pyiceberg.table import TableProperties from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import ( NullOrder, @@ -1613,3 +1614,50 @@ def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with tbl.append(arrow_table_with_null) assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null) + + +@pytest.mark.parametrize( + "catalog", + [ + lazy_fixture("catalog_memory"), + lazy_fixture("catalog_sqlite"), + lazy_fixture("catalog_sqlite_without_rowcount"), + ], +) +def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, random_table_identifier: str) -> None: + namespace = Catalog.namespace_from(random_table_identifier) + catalog.create_namespace(namespace) + table = catalog.create_table(random_table_identifier, table_schema_nested) + + original_metadata_location = table.metadata_location + + for i in range(5): + with table.transaction() as transaction: + with transaction.update_schema() as update: + update.add_column(path=f"new_column_{i}", field_type=IntegerType()) + + assert len(table.metadata.metadata_log) == 5 + assert os.path.exists(original_metadata_location[len("file://") :]) + + # Set the max versions property to 2, and delete after commit + new_property = { + TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2", + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true", + } + + with table.transaction() as transaction: + transaction.set_properties(properties=new_property) + + # Verify that only the most recent metadata files are kept + assert len(table.metadata.metadata_log) == 2 + updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log + + # new metadata log was added, so earlier metadata logs are removed. + with table.transaction() as transaction: + with transaction.update_schema() as update: + update.add_column(path="new_column_x", field_type=IntegerType()) + + assert len(table.metadata.metadata_log) == 2 + assert not os.path.exists(original_metadata_location[len("file://") :]) + assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :]) + assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])