diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index 55a0fd9f88..c7c26c4912 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -68,6 +68,7 @@ Iceberg tables support table properties to configure table behavior. | `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled | | `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation | | `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. | +| `write.metadata.path` | String pointing to location | `{metadata.location}/metadata` | Sets the location under which metadata is written. | ### Table behavior options @@ -203,12 +204,16 @@ PyIceberg uses [S3FileSystem](https://arrow.apache.org/docs/python/generated/pya ## Location Providers -Apache Iceberg uses the concept of a `LocationProvider` to manage file paths for a table's data. In PyIceberg, the -`LocationProvider` module is designed to be pluggable, allowing customization for specific use cases. The +Apache Iceberg uses the concept of a `LocationProvider` to manage file paths for a table's data files. In PyIceberg, the +`LocationProvider` module is designed to be pluggable, allowing customization for specific use cases, and to additionally determine metadata file locations. The `LocationProvider` for a table can be specified through table properties. -PyIceberg defaults to the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider), which generates -file paths that are optimized for object storage. +Both data file and metadata file locations can be customized by configuring the table properties [`write.data.path` and `write.metadata.path`](#write-options), respectively. + +For more granular control, you can override the `LocationProvider`'s `new_data_location` and `new_metadata_location` methods to define custom logic for generating file paths. See [`Loading a Custom Location Provider`](configuration.md#loading-a-custom-location-provider). + +PyIceberg defaults to the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider), which generates file paths for +data files that are optimized for object storage. ### Simple Location Provider diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index a39f6bc711..f2dce81131 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -57,6 +57,7 @@ Table, TableProperties, ) +from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( @@ -857,7 +858,8 @@ def _create_staged_table( database_name, table_name = self.identifier_to_database_and_table(identifier) location = self._resolve_table_location(location, database_name, table_name) - metadata_location = self._get_metadata_location(location=location) + provider = load_location_provider(location, properties) + metadata_location = provider.new_table_metadata_file_location() metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) @@ -888,7 +890,8 @@ def _update_and_stage_table( ) new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + provider = load_location_provider(updated_metadata.location, updated_metadata.properties) + new_metadata_location = provider.new_table_metadata_file_location(new_metadata_version) return StagedTable( identifier=table_identifier, @@ -945,13 +948,6 @@ def _get_default_warehouse_location(self, database_name: str, table_name: str) - def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> None: ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) - @staticmethod - def _get_metadata_location(location: str, new_version: int = 0) -> str: - if new_version < 0: - raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer") - version_str = f"{new_version:05d}" - return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json" - @staticmethod def _parse_metadata_version(metadata_location: str) -> int: """Parse the version from the metadata location. diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 5ed01df7f5..e3ce3c0e90 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -54,6 +54,7 @@ from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( @@ -173,7 +174,9 @@ def create_table( database_name, table_name = self.identifier_to_database_and_table(identifier) location = self._resolve_table_location(location, database_name, table_name) - metadata_location = self._get_metadata_location(location=location) + provider = load_location_provider(table_location=location, table_properties=properties) + metadata_location = provider.new_table_metadata_file_location() + metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index c72587c028..bc11e4f907 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -62,6 +62,7 @@ from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.locations import load_location_provider from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.table.update import ( @@ -207,7 +208,8 @@ def create_table( namespace = Catalog.namespace_to_string(namespace_identifier) location = self._resolve_table_location(location, namespace, table_name) - metadata_location = self._get_metadata_location(location=location) + location_provider = load_location_provider(table_location=location, table_properties=properties) + metadata_location = location_provider.new_table_metadata_file_location() metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f16aa28844..e57c3edfe3 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -79,6 +79,7 @@ ) from pyiceberg.schema import Schema from pyiceberg.table.inspect import InspectTable +from pyiceberg.table.locations import LocationProvider, load_location_provider from pyiceberg.table.metadata import ( INITIAL_SEQUENCE_NUMBER, TableMetadata, @@ -200,6 +201,7 @@ class TableProperties: WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True WRITE_DATA_PATH = "write.data.path" + WRITE_METADATA_PATH = "write.metadata.path" DELETE_MODE = "write.delete.mode" DELETE_MODE_COPY_ON_WRITE = "copy-on-write" @@ -1000,6 +1002,10 @@ def location(self) -> str: """Return the table's base location.""" return self.metadata.location + def location_provider(self) -> LocationProvider: + """Return the table's location provider.""" + return load_location_provider(table_location=self.metadata.location, table_properties=self.metadata.properties) + @property def last_sequence_number(self) -> int: return self.metadata.last_sequence_number diff --git a/pyiceberg/table/locations.py b/pyiceberg/table/locations.py index 0de4dc68b2..2d604abb6c 100644 --- a/pyiceberg/table/locations.py +++ b/pyiceberg/table/locations.py @@ -16,6 +16,7 @@ # under the License. import importlib import logging +import uuid from abc import ABC, abstractmethod from typing import Optional @@ -29,7 +30,7 @@ class LocationProvider(ABC): - """A base class for location providers, that provide data file locations for a table's write tasks. + """A base class for location providers, that provide file locations for a table's write tasks. Args: table_location (str): The table's base storage location. @@ -40,6 +41,7 @@ class LocationProvider(ABC): table_properties: Properties data_path: str + metadata_path: str def __init__(self, table_location: str, table_properties: Properties): self.table_location = table_location @@ -52,6 +54,11 @@ def __init__(self, table_location: str, table_properties: Properties): else: self.data_path = f"{self.table_location.rstrip('/')}/data" + if path := table_properties.get(TableProperties.WRITE_METADATA_PATH): + self.metadata_path = path.rstrip("/") + else: + self.metadata_path = f"{self.table_location.rstrip('/')}/metadata" + @abstractmethod def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: """Return a fully-qualified data file location for the given filename. @@ -64,6 +71,35 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti str: A fully-qualified location URI for the data file. """ + def new_table_metadata_file_location(self, new_version: int = 0) -> str: + """Return a fully-qualified metadata file location for a new table version. + + Args: + new_version (int): Version number of the metadata file. + + Returns: + str: fully-qualified URI for the new table metadata file. + + Raises: + ValueError: If the version is negative. + """ + if new_version < 0: + raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer") + + file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json" + return self.new_metadata_location(file_name) + + def new_metadata_location(self, metadata_file_name: str) -> str: + """Return a fully-qualified metadata file location for the given filename. + + Args: + metadata_file_name (str): Name of the metadata file. + + Returns: + str: A fully-qualified location URI for the metadata file. + """ + return f"{self.metadata_path}/{metadata_file_name}" + class SimpleLocationProvider(LocationProvider): def __init__(self, table_location: str, table_properties: Properties): diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c0d0056e7c..9652d738e6 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -84,14 +84,14 @@ from pyiceberg.table import Transaction -def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: - return f"{location}/metadata/{commit_uuid}-m{num}.avro" +def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str: + return f"{commit_uuid}-m{num}.avro" -def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: +def _new_manifest_list_file_name(snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: # Mimics the behavior in Java: # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 - return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" + return f"snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): @@ -243,13 +243,13 @@ def _commit(self) -> UpdatesAndRequirements: next_sequence_number = self._transaction.table_metadata.next_sequence_number() summary = self._summary(self.snapshot_properties) - - manifest_list_file_path = _generate_manifest_list_path( - location=self._transaction.table_metadata.location, + file_name = _new_manifest_list_file_name( snapshot_id=self._snapshot_id, attempt=0, commit_uuid=self.commit_uuid, ) + location_provider = self._transaction._table.location_provider() + manifest_list_file_path = location_provider.new_metadata_location(file_name) with write_manifest_list( format_version=self._transaction.table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), @@ -295,13 +295,10 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: ) def new_manifest_output(self) -> OutputFile: - return self._io.new_output( - _new_manifest_path( - location=self._transaction.table_metadata.location, - num=next(self._manifest_num_counter), - commit_uuid=self.commit_uuid, - ) - ) + location_provider = self._transaction._table.location_provider() + file_name = _new_manifest_file_name(num=next(self._manifest_num_counter), commit_uuid=self.commit_uuid) + file_path = location_provider.new_metadata_location(file_name) + return self._io.new_output(file_path) def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]: return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index e3532c0372..c00f4fde95 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -35,10 +35,12 @@ TableAlreadyExistsError, ) from pyiceberg.io import WAREHOUSE +from pyiceberg.io.pyarrow import schema_to_pyarrow from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( Table, + TableProperties, ) from pyiceberg.table.update import ( AddSchemaUpdate, @@ -563,3 +565,60 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None with pytest.raises(ValidationError) as exc_info: _ = given_catalog_has_a_table(catalog, properties=property_with_none) assert "None type is not a supported value in properties: property_name" in str(exc_info.value) + + +def test_table_writes_metadata_to_custom_location(catalog: InMemoryCatalog) -> None: + metadata_path = f"{catalog._warehouse_location}/custom/path" + catalog.create_namespace(TEST_TABLE_NAMESPACE) + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties={TableProperties.WRITE_METADATA_PATH: metadata_path}, + ) + df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA)) + table.append(df) + manifests = table.current_snapshot().manifests(table.io) # type: ignore + location_provider = table.location_provider() + + assert location_provider.new_metadata_location("").startswith(metadata_path) + assert manifests[0].manifest_path.startswith(metadata_path) + assert table.location() != metadata_path + assert table.metadata_location.startswith(metadata_path) + + +def test_table_writes_metadata_to_default_path(catalog: InMemoryCatalog) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + metadata_path = f"{table.location()}/metadata" + df = pa.Table.from_pylist([{"x": 123, "y": 456, "z": 789}], schema=schema_to_pyarrow(TEST_TABLE_SCHEMA)) + table.append(df) + manifests = table.current_snapshot().manifests(table.io) # type: ignore + location_provider = table.location_provider() + + assert location_provider.new_metadata_location("").startswith(metadata_path) + assert manifests[0].manifest_path.startswith(metadata_path) + assert table.metadata_location.startswith(metadata_path) + + +def test_table_metadata_writes_reflect_latest_path(catalog: InMemoryCatalog) -> None: + catalog.create_namespace(TEST_TABLE_NAMESPACE) + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + partition_spec=TEST_TABLE_PARTITION_SPEC, + ) + + initial_metadata_path = f"{table.location()}/metadata" + assert table.location_provider().new_metadata_location("metadata.json") == f"{initial_metadata_path}/metadata.json" + + # update table with new path for metadata + new_metadata_path = f"{table.location()}/custom/path" + table.transaction().set_properties({TableProperties.WRITE_METADATA_PATH: new_metadata_path}).commit_transaction() + + assert table.location_provider().new_metadata_location("metadata.json") == f"{new_metadata_path}/metadata.json" diff --git a/tests/table/test_locations.py b/tests/table/test_locations.py index 490bf7103a..d66bf18792 100644 --- a/tests/table/test_locations.py +++ b/tests/table/test_locations.py @@ -157,3 +157,27 @@ def test_simple_location_provider_write_data_path() -> None: ) assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet" + + +def test_location_provider_metadata_default_location() -> None: + provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) + + assert provider.new_metadata_location("manifest.avro") == "table_location/metadata/manifest.avro" + + +def test_location_provider_metadata_location_with_custom_path() -> None: + provider = load_location_provider( + table_location="table_location", + table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path"}, + ) + + assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json" + + +def test_metadata_location_with_trailing_slash() -> None: + provider = load_location_provider( + table_location="table_location", + table_properties={TableProperties.WRITE_METADATA_PATH: "s3://table-location/custom/path/"}, + ) + + assert provider.new_metadata_location("metadata.json") == "s3://table-location/custom/path/metadata.json"