Skip to content

Commit

Permalink
Move metadata path generation to the location provider
Browse files Browse the repository at this point in the history
  • Loading branch information
geruh committed Feb 13, 2025
1 parent 6f88749 commit 1c9f177
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 53 deletions.
9 changes: 5 additions & 4 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
Table,
TableProperties,
)
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
Expand Down Expand Up @@ -857,7 +858,8 @@ def _create_staged_table(
database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = Table.new_table_metadata_file_location(table_location=location, properties=properties)
provider = load_location_provider(location, properties)
metadata_location = provider.new_table_metadata_file_location()
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down Expand Up @@ -888,9 +890,8 @@ def _update_and_stage_table(
)

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = Table.new_table_metadata_file_location(
updated_metadata.location, new_metadata_version, updated_metadata.properties
)
provider = load_location_provider(updated_metadata.location, updated_metadata.properties)
new_metadata_location = provider.new_table_metadata_file_location(new_metadata_version)

return StagedTable(
identifier=table_identifier,
Expand Down
5 changes: 4 additions & 1 deletion pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
Expand Down Expand Up @@ -173,7 +174,9 @@ def create_table(
database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = Table.new_table_metadata_file_location(table_location=location, properties=properties)
provider = load_location_provider(table_location=location, table_properties=properties)
metadata_location = provider.new_table_metadata_file_location()

metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down
4 changes: 3 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableResponse, Table
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.table.update import (
Expand Down Expand Up @@ -207,7 +208,8 @@ def create_table(

namespace = Catalog.namespace_to_string(namespace_identifier)
location = self._resolve_table_location(location, namespace, table_name)
metadata_location = Table.new_table_metadata_file_location(table_location=location, properties=properties)
location_provider = load_location_provider(table_location=location, table_properties=properties)
metadata_location = location_provider.new_table_metadata_file_location()
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down
43 changes: 5 additions & 38 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
)
from pyiceberg.schema import Schema
from pyiceberg.table.inspect import InspectTable
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.table.metadata import (
INITIAL_SEQUENCE_NUMBER,
TableMetadata,
Expand Down Expand Up @@ -1001,6 +1002,10 @@ def location(self) -> str:
"""Return the table's base location."""
return self.metadata.location

def location_provider(self) -> LocationProvider:
"""Return the table's location provider."""
return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties)

@property
def last_sequence_number(self) -> int:
return self.metadata.last_sequence_number
Expand Down Expand Up @@ -1237,44 +1242,6 @@ def to_polars(self) -> pl.LazyFrame:

return pl.scan_iceberg(self)

@staticmethod
def new_table_metadata_file_location(table_location: str, new_version: int = 0, properties: Properties = EMPTY_DICT) -> str:
"""Return a fully-qualified metadata file location for a new table version.
Args:
table_location (str): the base table location.
new_version (int): Version number of the metadata file.
properties (Properties): Table properties that may contain a custom metadata path.
Returns:
str: fully-qualified URI for the new table metadata file.
Raises:
ValueError: If the version is negative.
"""
if new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
return Table.new_metadata_location(table_location, file_name, properties)

@staticmethod
def new_metadata_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
"""Return a fully-qualified metadata file location for the given filename.
Args:
table_location (str): The base table location
file_name (str): Name of the metadata file
properties (Properties): Table properties that may contain a custom metadata path
Returns:
str: A fully-qualified location URI for the metadata file.
"""
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
return f"{metadata_path.rstrip('/')}/{file_name}"

return f"{table_location}/metadata/{file_name}"


class StaticTable(Table):
"""Load a table directly from a metadata file (i.e., without using a catalog)."""
Expand Down
38 changes: 37 additions & 1 deletion pyiceberg/table/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import importlib
import logging
import uuid
from abc import ABC, abstractmethod
from typing import Optional

Expand All @@ -29,7 +30,7 @@


class LocationProvider(ABC):
"""A base class for location providers, that provide data file locations for a table's write tasks.
"""A base class for location providers, that provide file locations for a table's write tasks.
Args:
table_location (str): The table's base storage location.
Expand All @@ -40,6 +41,7 @@ class LocationProvider(ABC):
table_properties: Properties

data_path: str
metadata_path: str

def __init__(self, table_location: str, table_properties: Properties):
self.table_location = table_location
Expand All @@ -52,6 +54,11 @@ def __init__(self, table_location: str, table_properties: Properties):
else:
self.data_path = f"{self.table_location.rstrip('/')}/data"

if path := table_properties.get(TableProperties.WRITE_METADATA_PATH):
self.metadata_path = path.rstrip("/")
else:
self.metadata_path = f"{self.table_location.rstrip('/')}/metadata"

@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
"""Return a fully-qualified data file location for the given filename.
Expand All @@ -64,6 +71,35 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti
str: A fully-qualified location URI for the data file.
"""

def new_table_metadata_file_location(self, new_version: int = 0) -> str:
"""Return a fully-qualified metadata file location for a new table version.
Args:
new_version (int): Version number of the metadata file.
Returns:
str: fully-qualified URI for the new table metadata file.
Raises:
ValueError: If the version is negative.
"""
if new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")

file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
return self.new_metadata_location(file_name)

def new_metadata_location(self, metadata_file_name: str) -> str:
"""Return a fully-qualified metadata file location for the given filename.
Args:
metadata_file_name (str): Name of the metadata file.
Returns:
str: A fully-qualified location URI for the metadata file.
"""
return f"{self.metadata_path}/{metadata_file_name}"


class SimpleLocationProvider(LocationProvider):
def __init__(self, table_location: str, table_properties: Properties):
Expand Down
10 changes: 4 additions & 6 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,13 @@ def _commit(self) -> UpdatesAndRequirements:
next_sequence_number = self._transaction.table_metadata.next_sequence_number()

summary = self._summary(self.snapshot_properties)
table_location = self._transaction.table_metadata.location
properties = self._transaction.table_metadata.properties
file_name = _new_manifest_list_file_name(
snapshot_id=self._snapshot_id,
attempt=0,
commit_uuid=self.commit_uuid,
)
manifest_list_file_path = self._transaction._table.new_metadata_location(table_location, file_name, properties)
location_provider = self._transaction._table.location_provider()
manifest_list_file_path = location_provider.new_metadata_location(file_name)
with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
Expand Down Expand Up @@ -296,10 +295,9 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
)

def new_manifest_output(self) -> OutputFile:
table_location = self._transaction.table_metadata.location
properties = self._transaction.table_metadata.properties
location_provider = self._transaction._table.location_provider()
file_name = _new_manifest_file_name(num=next(self._manifest_num_counter), commit_uuid=self.commit_uuid)
file_path = self._transaction._table.new_metadata_location(table_location, file_name, properties)
file_path = location_provider.new_metadata_location(file_name)
return self._io.new_output(file_path)

def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
Expand Down
24 changes: 22 additions & 2 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,9 @@ def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> N
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
table.append(df)
manifests = table.current_snapshot().manifests(table.io) # type: ignore
location_provider = table.location_provider()

assert table.new_metadata_location(table.location(), "", table.properties).startswith(metadata_path)
assert location_provider.new_metadata_location("").startswith(metadata_path)
assert manifests[0].manifest_path.startswith(metadata_path)
assert table.location() != metadata_path
assert table.metadata_location.startswith(metadata_path)
Expand All @@ -598,7 +599,26 @@ def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None
df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA))
table.append(df)
manifests = table.current_snapshot().manifests(table.io) # type: ignore
location_provider = table.location_provider()

assert table.new_metadata_location(table.location(), "", table.properties).startswith(metadata_path)
assert location_provider.new_metadata_location("").startswith(metadata_path)
assert manifests[0].manifest_path.startswith(metadata_path)
assert table.metadata_location.startswith(metadata_path)


def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
partition_spec=TEST_TABLE_PARTITION_SPEC,
)

initial_metadata_path = f"{table.location()}/metadata"
assert table.location_provider().new_metadata_location("metadata.json") == f"{initial_metadata_path}/metadata.json"

# update table with new path for metadata
new_metadata_path = f"{table.location()}/custom/path"
table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction()

assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json"
24 changes: 24 additions & 0 deletions tests/table/test_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,27 @@ def test_simple_location_provider_write_data_path() -> None:
)

assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet"


def test_location_provider_metadata_default_location() -> None:
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)

assert provider.new_metadata_location("manifest.avro") == "table_location/metadata/manifest.avro"


def test_location_provider_metadata_location_with_custom_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path"},
)

assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"


def test_metadata_location_with_trailing_slash() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path/"},
)

assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"

0 comments on commit 1c9f177

Please sign in to comment.