-
Notifications
You must be signed in to change notification settings - Fork 227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement write.metadata.delete-after-commit.enabled
to clean up old metadata files
#1607
Changes from 8 commits
0985972
d244ffd
4807040
352b972
a375e1c
1f631c2
7e4c4b3
d1ed69c
f440ac1
4b36bfc
f7b7c03
d30bbaf
01fb388
462f3d4
45516dc
e8277c8
ed7a705
ec3de23
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -55,6 +55,7 @@ | |
CreateTableTransaction, | ||
StagedTable, | ||
Table, | ||
TableProperties, | ||
) | ||
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata | ||
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder | ||
|
@@ -72,6 +73,7 @@ | |
from pyiceberg.utils.config import Config, merge_config | ||
from pyiceberg.utils.deprecated import deprecated as deprecated | ||
from pyiceberg.utils.deprecated import deprecation_message | ||
from pyiceberg.utils.properties import property_as_bool | ||
|
||
if TYPE_CHECKING: | ||
import pyarrow as pa | ||
|
@@ -858,6 +860,12 @@ def _update_and_stage_table( | |
enforce_validation=current_table is None, | ||
metadata_location=current_table.metadata_location if current_table else None, | ||
) | ||
io = self._load_file_io(properties=updated_metadata.properties, location=updated_metadata.location) | ||
kevinjqliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# https://github.com/apache/iceberg/blob/f6faa58/core/src/main/java/org/apache/iceberg/CatalogUtil.java#L527 | ||
# delete old metadata if METADATA_DELETE_AFTER_COMMIT_ENABLED is set to true | ||
if current_table is not None: | ||
self._delete_old_metadata(io, current_table.metadata, updated_metadata) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this the right place for this operation? At this point, changes are only applied to the table metadata but is not yet committed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the java implementation uses |
||
|
||
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 | ||
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) | ||
|
@@ -866,7 +874,7 @@ def _update_and_stage_table( | |
identifier=table_identifier, | ||
metadata=updated_metadata, | ||
metadata_location=new_metadata_location, | ||
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), | ||
io=io, | ||
catalog=self, | ||
) | ||
|
||
|
@@ -913,6 +921,20 @@ def _get_default_warehouse_location(self, database_name: str, table_name: str) - | |
|
||
raise ValueError("No default path is set, please specify a location when creating a table") | ||
|
||
def _delete_old_metadata(self, io: FileIO, base: TableMetadata, metadata: TableMetadata) -> None: | ||
"""Delete oldest metadata if config is set to true.""" | ||
delete_after_commit: bool = property_as_bool( | ||
metadata.properties, | ||
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, | ||
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT, | ||
) | ||
|
||
if delete_after_commit: | ||
removed_previous_metadata_files: set[str] = {log.metadata_file for log in base.metadata_log} | ||
current_metadata_files: set[str] = {log.metadata_file for log in metadata.metadata_log} | ||
removed_previous_metadata_files.difference_update(current_metadata_files) | ||
delete_files(io, removed_previous_metadata_files, METADATA) | ||
|
||
@staticmethod | ||
def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None: | ||
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
https://iceberg.apache.org/docs/1.6.0/maintenance/#remove-old-metadata-files