Skip to content

Commit

Permalink
Add support for write.metadata.path
Browse files Browse the repository at this point in the history
  • Loading branch information
geruh committed Feb 11, 2025
1 parent 509713b commit 27fabd2
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 44 deletions.
27 changes: 14 additions & 13 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,20 @@ Iceberg tables support table properties to configure table behavior.

### Write options

| Key | Options | Default | Description |
|------------------------------------------|------------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
| Key | Options | Default | Description |
|------------------------------------------|------------------------------------|--------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |
| `write.metadata.path` | String pointing to location | `{metadata.location}/metadata` | Sets the location under which metadata is written. |

### Table behavior options

Expand Down
32 changes: 27 additions & 5 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
CreateTableTransaction,
StagedTable,
Table,
TableProperties,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -757,6 +758,24 @@ def _convert_schema_if_needed(schema: Union[Schema, "pa.Schema"]) -> Schema:
pass
raise ValueError(f"{type(schema)=}, but it must be pyiceberg.schema.Schema or pyarrow.Schema")

@staticmethod
def metadata_file_location(table_location: str, file_name: str, properties: Properties = EMPTY_DICT) -> str:
"""Get the full path for a metadata file.
Args:
table_location (str): The base table location
file_name (str): Name of the metadata file
properties (Properties): Table properties that may contain custom metadata path
Returns:
str: Full path where the metadata file should be stored
"""
if metadata_path := properties.get(TableProperties.WRITE_METADATA_PATH):
base_path = metadata_path.rstrip("/")
else:
base_path = f"{table_location}/metadata"
return f"{base_path}/{file_name}"

def __repr__(self) -> str:
"""Return the string representation of the Catalog class."""
return f"{self.name} ({self.__class__})"
Expand Down Expand Up @@ -840,7 +859,7 @@ def _create_staged_table(
database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down Expand Up @@ -871,7 +890,9 @@ def _update_and_stage_table(
)

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
new_metadata_location = self._get_metadata_location(
updated_metadata.location, new_metadata_version, updated_metadata.properties
)

return StagedTable(
identifier=table_identifier,
Expand Down Expand Up @@ -929,11 +950,12 @@ def _write_metadata(metadata: TableMetadata, io: FileIO, metadata_path: str) ->
ToOutputFile.table_metadata(metadata, io.new_output(metadata_path))

@staticmethod
def _get_metadata_location(location: str, new_version: int = 0) -> str:
def _get_metadata_location(table_location: str, new_version: int = 0, properties: Properties = EMPTY_DICT) -> str:
if new_version < 0:
raise ValueError(f"Table metadata version: `{new_version}` must be a non-negative integer")
version_str = f"{new_version:05d}"
return f"{location}/metadata/{version_str}-{uuid.uuid4()}.metadata.json"

file_name = f"{new_version:05d}-{uuid.uuid4()}.metadata.json"
return Catalog.metadata_file_location(table_location, file_name, properties)

@staticmethod
def _parse_metadata_version(metadata_location: str) -> int:
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def create_table(
database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def create_table(

namespace = Catalog.namespace_to_string(namespace_identifier)
location = self._resolve_table_location(location, namespace, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata_location = self._get_metadata_location(table_location=location, properties=properties)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
Expand Down
5 changes: 5 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class TableProperties:
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True

WRITE_DATA_PATH = "write.data.path"
WRITE_METADATA_PATH = "write.metadata.path"

DELETE_MODE = "write.delete.mode"
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
Expand Down Expand Up @@ -1212,6 +1213,10 @@ def to_daft(self) -> daft.DataFrame:

return daft.read_iceberg(self)

def metadata_file_location(self, file_name: str) -> str:
"""Get the metadata file location using write.metadata.path from properties if set."""
return self.catalog.metadata_file_location(self.metadata.location, file_name, self.metadata.properties)


class StaticTable(Table):
"""Load a table directly from a metadata file (i.e., without using a catalog)."""
Expand Down
29 changes: 5 additions & 24 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,6 @@
from pyiceberg.table import Transaction


def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str:
return f"{location}/metadata/{commit_uuid}-m{num}.avro"


def _generate_manifest_list_path(location: str, snapshot_id: int, attempt: int, commit_uuid: uuid.UUID) -> str:
# Mimics the behavior in Java:
# https://github.com/apache/iceberg/blob/c862b9177af8e2d83122220764a056f3b96fd00c/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L491
return f"{location}/metadata/snap-{snapshot_id}-{attempt}-{commit_uuid}.avro"


class _SnapshotProducer(UpdateTableMetadata[U], Generic[U]):
commit_uuid: uuid.UUID
_io: FileIO
Expand Down Expand Up @@ -243,13 +233,8 @@ def _commit(self) -> UpdatesAndRequirements:
next_sequence_number = self._transaction.table_metadata.next_sequence_number()

summary = self._summary(self.snapshot_properties)

manifest_list_file_path = _generate_manifest_list_path(
location=self._transaction.table_metadata.location,
snapshot_id=self._snapshot_id,
attempt=0,
commit_uuid=self.commit_uuid,
)
file_name = f"{self.commit_uuid}-m{self._snapshot_id}-a0.avro"
manifest_list_file_path = self._transaction._table.metadata_file_location(file_name)
with write_manifest_list(
format_version=self._transaction.table_metadata.format_version,
output_file=self._io.new_output(manifest_list_file_path),
Expand Down Expand Up @@ -295,13 +280,9 @@ def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
)

def new_manifest_output(self) -> OutputFile:
return self._io.new_output(
_new_manifest_path(
location=self._transaction.table_metadata.location,
num=next(self._manifest_num_counter),
commit_uuid=self.commit_uuid,
)
)
file_name = f"{self.commit_uuid}-m{next(self._manifest_num_counter)}.avro"
file_path = self._transaction._table.metadata_file_location(file_name)
return self._io.new_output(file_path)

def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = True) -> List[ManifestEntry]:
return manifest.fetch_manifest_entry(io=self._io, discard_deleted=discard_deleted)
Expand Down

0 comments on commit 27fabd2

Please sign in to comment.