Skip to content

Commit

Permalink
Clean up old metadata (#1607)
Browse files Browse the repository at this point in the history
Implements property `write.metadata.delete-after-commit.enabled` from
https://iceberg.apache.org/docs/1.5.1/maintenance/#remove-old-metadata-files.

Closes #1199

---------

Co-authored-by: Kevin Liu <[email protected]>
  • Loading branch information
kaushiksrini and kevinjqliu authored Feb 12, 2025
1 parent df2e16a commit d6dce6d
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 0 deletions.
1 change: 1 addition & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Iceberg tables support table properties to configure table behavior.
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
| `write.metadata.delete-after-commit.enabled` | Boolean | False | Whether to automatically delete old *tracked* metadata files after each table commit. It will retain a number of the most recent metadata files, which can be set using property `write.metadata.previous-versions-max`. |
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
Expand Down
17 changes: 17 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
CreateTableTransaction,
StagedTable,
Table,
TableProperties,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand All @@ -72,6 +73,7 @@
from pyiceberg.utils.config import Config, merge_config
from pyiceberg.utils.deprecated import deprecated as deprecated
from pyiceberg.utils.deprecated import deprecation_message
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
import pyarrow as pa
Expand Down Expand Up @@ -757,6 +759,21 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
pass
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")

@staticmethod
def _delete_old_metadata(io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None:
"""Delete oldest metadata if config is set to true."""
delete_after_commit: bool = property_as_bool(
metadata.properties,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT,
)

if delete_after_commit:
removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log}
current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log}
removed_previous_metadata_files.difference_update(current_metadata_files)
delete_files(io, removed_previous_metadata_files, METADATA)

def __repr__(self) -> str:
"""Return the string representation of the Catalog class."""
return f"{self.name} ({self.__class__})"
Expand Down
13 changes: 13 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ class TableProperties:
METADATA_PREVIOUS_VERSIONS_MAX = "write.metadata.previous-versions-max"
METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT = 100

METADATA_DELETE_AFTER_COMMIT_ENABLED = "write.metadata.delete-after-commit.enabled"
METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT = False

MAX_SNAPSHOT_AGE_MS = "history.expire.max-snapshot-age-ms"
MAX_SNAPSHOT_AGE_MS_DEFAULT = 5 * 24 * 60 * 60 * 1000 # 5 days

Expand Down Expand Up @@ -1181,6 +1184,16 @@ def refs(self) -> Dict[str, SnapshotRef]:

def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog.commit_table(self, requirements, updates)

# https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527
# delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true and uses
# TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many previous versions to keep -
# everything else will be removed.
try:
self.catalog._delete_old_metadata(self.io, self.metadata, response.metadata)
except Exception as e:
warnings.warn(f"Failed to delete old metadata after commit: {e}")

self.metadata = response.metadata
self.metadata_location = response.metadata_location

Expand Down
48 changes: 48 additions & 0 deletions tests/catalog/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.snapshots import Operation
from pyiceberg.table.sorting import (
NullOrder,
Expand Down Expand Up @@ -1613,3 +1614,50 @@ def test_merge_manifests_local_file_system(catalog: SqlCatalog, arrow_table_with
tbl.append(arrow_table_with_null)

assert len(tbl.scan().to_arrow()) == 5 * len(arrow_table_with_null)


@pytest.mark.parametrize(
"catalog",
[
lazy_fixture("catalog_memory"),
lazy_fixture("catalog_sqlite"),
lazy_fixture("catalog_sqlite_without_rowcount"),
],
)
def test_delete_metadata_multiple(catalog: SqlCatalog, table_schema_nested: Schema, random_table_identifier: str) -> None:
namespace = Catalog.namespace_from(random_table_identifier)
catalog.create_namespace(namespace)
table = catalog.create_table(random_table_identifier, table_schema_nested)

original_metadata_location = table.metadata_location

for i in range(5):
with table.transaction() as transaction:
with transaction.update_schema() as update:
update.add_column(path=f"new_column_{i}", field_type=IntegerType())

assert len(table.metadata.metadata_log) == 5
assert os.path.exists(original_metadata_location[len("file://") :])

# Set the max versions property to 2, and delete after commit
new_property = {
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX: "2",
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED: "true",
}

with table.transaction() as transaction:
transaction.set_properties(properties=new_property)

# Verify that only the most recent metadata files are kept
assert len(table.metadata.metadata_log) == 2
updated_metadata_1, updated_metadata_2 = table.metadata.metadata_log

# new metadata log was added, so earlier metadata logs are removed.
with table.transaction() as transaction:
with transaction.update_schema() as update:
update.add_column(path="new_column_x", field_type=IntegerType())

assert len(table.metadata.metadata_log) == 2
assert not os.path.exists(original_metadata_location[len("file://") :])
assert not os.path.exists(updated_metadata_1.metadata_file[len("file://") :])
assert os.path.exists(updated_metadata_2.metadata_file[len("file://") :])

0 comments on commit d6dce6d

Please sign in to comment.