Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
geruh committed Feb 11, 2025
1 parent 27fabd2 commit 4e47edc
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 27 deletions.
21 changes: 1 addition & 20 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
CreateTableTransaction,
StagedTable,
Table,
TableProperties,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -758,24 +757,6 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
pass
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")

@staticmethod
def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
"""Get the full path for a metadata file.
Args:
table_location (str): The base table location
file_name (str): Name of the metadata file
properties (Properties): Table properties that may contain custom metadata path
Returns:
str: Full path where the metadata file should be stored
"""
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
base_path = metadata_path.rstrip("/")
else:
base_path = f"{table_location}/metadata"
return f"{base_path}/{file_name}"

def __repr__(self) -> str:
"""Return the string representation of the Catalog class."""
return f"{self.name} ({self.__class__})"
Expand Down Expand Up @@ -955,7 +936,7 @@ def _get_metadata_location(table_location: str, new_version: int = 0, properties
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
return Catalog.metadata_file_location(table_location, file_name, properties)
return Table.metadata_file_location(table_location, file_name, properties)

@staticmethod
def _parse_metadata_version(metadata_location: str) -> int:
Expand Down
19 changes: 16 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,9 +1213,22 @@ def to_daft(self) -> daft.DataFrame:

return daft.read_iceberg(self)

def metadata_file_location(self, file_name: str) -> str:
"""Get the metadata file location using write.metadata.path from properties if set."""
return self.catalog.metadata_file_location(self.metadata.location, file_name, self.metadata.properties)
@staticmethod
def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
"""Get the full path for a metadata file.
Args:
table_location (str): The base table location
file_name (str): Name of the metadata file
properties (Properties): Table properties that may contain custom metadata path
Returns:
str: Full path where the metadata file should be stored
"""
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
return f"{metadata_path.rstrip("/")}/{file_name}"

return f"{table_location}/metadata/{file_name}"


class StaticTable(Table):
Expand Down
26 changes: 22 additions & 4 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@
from pyiceberg.table import Transaction


def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
return f"{commit_uuid}-m{num}.avro"


def _generate_manifest_list_file_name(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
# Mimics the behavior in Java:
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"


class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
commit_uuid: uuid.UUID
_io: FileIO
Expand Down Expand Up @@ -233,8 +243,14 @@ def _commit(self) -> UpdatesAndRequirements:
next_sequence_number = self._transaction.table_metadata.next_sequence_number()

summary = self._summary(self.snapshot_properties)
file_name = f"{self.commit_uuid}-m{self._snapshot_id}-a0.avro"
manifest_list_file_path = self._transaction._table.metadata_file_location(file_name)
table_location = self._transaction.table_metadata.location
properties = self._transaction.table_metadata.properties
file_name = _generate_manifest_list_file_name(
snapshot_id=self._snapshot_id,
attempt=0,
commit_uuid=self.commit_uuid,
)
manifest_list_file_path = self._transaction._table.metadata_file_location(table_location, file_name, properties)
with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
Expand Down Expand Up @@ -280,8 +296,10 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
)

def new_manifest_output(self) -> OutputFile:
file_name = f"{self.commit_uuid}-m{next(self._manifest_num_counter)}.avro"
file_path = self._transaction._table.metadata_file_location(file_name)
table_location = self._transaction.table_metadata.location
properties = self._transaction.table_metadata.properties
file_name = _new_manifest_file_name(num=next(self._manifest_num_counter), commit_uuid=self.commit_uuid)
file_path = self._transaction._table.metadata_file_location(table_location, file_name, properties)
return self._io.new_output(file_path)

def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
Expand Down
39 changes: 39 additions & 0 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
TableAlreadyExistsError,
)
from pyiceberg.io import WAREHOUSE
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
Table,
TableProperties,
)
from pyiceberg.table.update import (
AddSchemaUpdate,
Expand Down Expand Up @@ -563,3 +565,40 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
with pytest.raises(ValidationError) as exc_info:
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)


def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> None:
metadata_path = f"{catalog._warehouse_location}/custom/path"
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties={TableProperties.WRITE_METADATA_PATH: metadata_path},
)
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
table.append(df)
manifests = table.current_snapshot().manifests(table.io) # type: ignore

assert table.metadata_file_location(table.location(), "", table.properties).startswith(metadata_path)
assert manifests[0].manifest_path.startswith(metadata_path)
assert table.location() != metadata_path
assert table.metadata_location.startswith(metadata_path)


def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
properties=TEST_TABLE_PROPERTIES,
)
metadata_path = f"{table.location()}/metadata"
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
table.append(df)
manifests = table.current_snapshot().manifests(table.io) # type: ignore

assert table.metadata_file_location(table.location(), "", table.properties).startswith(metadata_path)
assert manifests[0].manifest_path.startswith(metadata_path)
assert table.metadata_location.startswith(metadata_path)

0 comments on commit 4e47edc

Please sign in to comment.