diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index bae8ca997f..9a631028af 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -54,19 +54,20 @@ Iceberg tables support table properties to configure table behavior. ### Write options -| Key | Options | Default | Description | -|------------------------------------------|------------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------| -| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | -| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | -| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group | -| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | -| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk | -| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | -| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. | -| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation | -| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled | -| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation | -| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. | +| Key | Options | Default | Description | +|------------------------------------------|------------------------------------|--------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. | +| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg | +| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group | +| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk | +| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk | +| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group | +| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. | +| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation | +| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled | +| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation | +| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. | +| `write.metadata.path` | String pointing to location | `{metadata.location}/metadata` | Sets the location under which metadata is written. | ### Table behavior options diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 18aea91940..3e6e19f9f2 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -55,6 +55,7 @@ CreateTableTransaction, StagedTable, Table, + TableProperties, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -757,6 +758,24 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema: pass raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema") + @staticmethod + def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str: + """Get the full path for a metadata file. + + Args: + table_location (str): The base table location + file_name (str): Name of the metadata file + properties (Properties): Table properties that may contain custom metadata path + + Returns: + str: Full path where the metadata file should be stored + """ + if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH): + base_path = metadata_path.rstrip("/") + else: + base_path = f"{table_location}/metadata" + return f"{base_path}/{file_name}" + def __repr__(self) -> str: """Return the string representation of the Catalog class.""" return f"{self.name} ({self.__class__})" @@ -840,7 +859,7 @@ def _create_staged_table( database_name, table_name = self.identifier_to_database_and_table(identifier) location = self._resolve_table_location(location, database_name, table_name) - metadata_location = self._get_metadata_location(location=location) + metadata_location = self._get_metadata_location(table_location=location, properties=properties) metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) @@ -871,7 +890,9 @@ def _update_and_stage_table( ) new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + new_metadata_location = self._get_metadata_location( + updated_metadata.location, new_metadata_version, updated_metadata.properties + ) return StagedTable( identifier=table_identifier, @@ -929,11 +950,12 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) -> ToOutputFile.table_metadata(metadata, io.new_output(metadata_path)) @staticmethod - def _get_metadata_location(location: str, new_version: int = 0) -> str: + def _get_metadata_location(table_location: str, new_version: int = 0, properties: Properties = EMPTY_DICT) -> str: if new_version < 0: raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer") - version_str = f"{new_version:05d}" - return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json" + + file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json" + return Catalog.metadata_file_location(table_location, file_name, properties) @staticmethod def _parse_metadata_version(metadata_location: str) -> int: diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 5ed01df7f5..a96c8e7f19 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -173,7 +173,7 @@ def create_table( database_name, table_name = self.identifier_to_database_and_table(identifier) location = self._resolve_table_location(location, database_name, table_name) - metadata_location = self._get_metadata_location(location=location) + metadata_location = self._get_metadata_location(table_location=location, properties=properties) metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index c72587c028..cc31d3dec3 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -207,7 +207,7 @@ def create_table( namespace = Catalog.namespace_to_string(namespace_identifier) location = self._resolve_table_location(location, namespace, table_name) - metadata_location = self._get_metadata_location(location=location) + metadata_location = self._get_metadata_location(table_location=location, properties=properties) metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8186d6573c..f9d56de272 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -199,6 +199,7 @@ class TableProperties: WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True WRITE_DATA_PATH = "write.data.path" + WRITE_METADATA_PATH = "write.metadata.path" DELETE_MODE = "write.delete.mode" DELETE_MODE_COPY_ON_WRITE = "copy-on-write" @@ -1212,6 +1213,10 @@ def to_daft(self) -> daft.DataFrame: return daft.read_iceberg(self) + def metadata_file_location(self, file_name: str) -> str: + """Get the metadata file location using write.metadata.path from properties if set.""" + return self.catalog.metadata_file_location(self.metadata.location, file_name, self.metadata.properties) + class StaticTable(Table): """Load a table directly from a metadata file (i.e., without using a catalog).""" diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index c0d0056e7c..c8ee80faab 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -84,16 +84,6 @@ from pyiceberg.table import Transaction -def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: - return f"{location}/metadata/{commit_uuid}-m{num}.avro" - - -def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str: - # Mimics the behavior in Java: - # https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491 - return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro" - - class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]): commit_uuid: uuid.UUID _io: FileIO @@ -243,13 +233,8 @@ def _commit(self) -> UpdatesAndRequirements: next_sequence_number = self._transaction.table_metadata.next_sequence_number() summary = self._summary(self.snapshot_properties) - - manifest_list_file_path = _generate_manifest_list_path( - location=self._transaction.table_metadata.location, - snapshot_id=self._snapshot_id, - attempt=0, - commit_uuid=self.commit_uuid, - ) + file_name = f"{self.commit_uuid}-m{self._snapshot_id}-a0.avro" + manifest_list_file_path = self._transaction._table.metadata_file_location(file_name) with write_manifest_list( format_version=self._transaction.table_metadata.format_version, output_file=self._io.new_output(manifest_list_file_path), @@ -295,13 +280,9 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter: ) def new_manifest_output(self) -> OutputFile: - return self._io.new_output( - _new_manifest_path( - location=self._transaction.table_metadata.location, - num=next(self._manifest_num_counter), - commit_uuid=self.commit_uuid, - ) - ) + file_name = f"{self.commit_uuid}-m{next(self._manifest_num_counter)}.avro" + file_path = self._transaction._table.metadata_file_location(file_name) + return self._io.new_output(file_path) def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]: return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)