Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for write.data.path #1611

Merged
merged 3 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,19 @@ Iceberg tables support table properties to configure table behavior.

### Write options

| Key | Options | Default | Description |
|------------------------------------------|-----------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
| Key | Options | Default | Description |
|------------------------------------------|------------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |

### Table behavior options

Expand Down Expand Up @@ -210,8 +211,8 @@ file paths that are optimized for object storage.

### Simple Location Provider

The `SimpleLocationProvider` places a table's file names underneath a `data` directory in the table's base storage
location (this is `table.metadata.location` - see the [Iceberg table specification](https://iceberg.apache.org/spec/#table-metadata)).
The `SimpleLocationProvider` provides paths prefixed by `{location}/data/`, where `location` comes from the [table metadata](https://iceberg.apache.org/spec/#table-metadata-fields). This can be overridden by setting [`write.data.path` table configuration](#write-options).

For example, a non-partitioned table might have a data file with location:

```txt
Expand Down Expand Up @@ -239,9 +240,9 @@ When several files are stored under the same prefix, cloud object stores such as
resulting in slowdowns. The `ObjectStoreLocationProvider` counteracts this by injecting deterministic hashes, in the form of binary directories,
into file paths, to distribute files across a larger number of object store prefixes.

Paths still contain partitions just before the file name, in Hive-style, and a `data` directory beneath the table's location,
in a similar manner to the [`SimpleLocationProvider`](configuration.md#simple-location-provider). For example, a table
partitioned over a string column `category` might have a data file with location: (note the additional binary directories)
Paths are prefixed by `{location}/data/`, where `location` comes from the [table metadata](https://iceberg.apache.org/spec/#table-metadata-fields), in a similar manner to the [`SimpleLocationProvider`](configuration.md#simple-location-provider). This can be overridden by setting [`write.data.path` table configuration](#write-options).

For example, a table partitioned over a string column `category` might have a data file with location: (note the additional binary directories)

```txt
s3://bucket/ns/table/data/0101/0110/1001/10110010/category=orders/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ class TableProperties:
WRITE_OBJECT_STORE_PARTITIONED_PATHS = "write.object-storage.partitioned-paths"
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True

WRITE_DATA_PATH = "write.data.path"

DELETE_MODE = "write.delete.mode"
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
DELETE_MODE_MERGE_ON_READ = "merge-on-read"
Expand Down
19 changes: 14 additions & 5 deletions pyiceberg/table/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ class LocationProvider(ABC):
table_location: str
table_properties: Properties

data_path: str

def __init__(self, table_location: str, table_properties: Properties):
self.table_location = table_location
self.table_properties = table_properties

if path := table_properties.get(TableProperties.WRITE_DATA_PATH):
self.data_path = path.rstrip("/")
else:
self.data_path = f"{self.table_location.rstrip('/')}/data"

@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
"""Return a fully-qualified data file location for the given filename.
Expand All @@ -62,8 +69,11 @@ def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)

def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
prefix = f"{self.table_location}/data"
return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}"
return (
f"{self.data_path}/{partition_key.to_path()}/{data_file_name}"
if partition_key
else f"{self.data_path}/{data_file_name}"
)


class ObjectStoreLocationProvider(LocationProvider):
Expand All @@ -85,13 +95,12 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti
if self._include_partition_paths and partition_key:
return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}")

prefix = f"{self.table_location}/data"
hashed_path = self._compute_hash(data_file_name)

return (
f"{prefix}/{hashed_path}/{data_file_name}"
f"{self.data_path}/{hashed_path}/{data_file_name}"
if self._include_partition_paths
else f"{prefix}/{hashed_path}-{data_file_name}"
else f"{self.data_path}/{hashed_path}-{data_file_name}"
)

@staticmethod
Expand Down
24 changes: 24 additions & 0 deletions tests/table/test_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT
Expand Down Expand Up @@ -133,3 +134,26 @@ def test_hash_injection(data_file_name: str, expected_hash: str) -> None:
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)

assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}"


def test_object_location_provider_write_data_path() -> None:
provider = load_location_provider(
table_location="s3://table-location/table",
table_properties={TableProperties.WRITE_DATA_PATH: "s3://table-location/custom/data/path"},
)

assert (
provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/0010/1111/0101/11011101/file.parquet"
)


def test_simple_location_provider_write_data_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={
TableProperties.WRITE_DATA_PATH: "s3://table-location/custom/data/path",
"write.object-storage.enabled": "false",
},
)

assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet"