Skip to content

Commit

Permalink
Add support for write.data.path (#1611)
Browse files Browse the repository at this point in the history
Relates to #1492

---------

Co-authored-by: Kevin Liu <[email protected]>
  • Loading branch information
Fokko and kevinjqliu authored Feb 7, 2025
1 parent b47af2d commit 7808121
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 22 deletions.
35 changes: 18 additions & 17 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,19 @@ Iceberg tables support table properties to configure table behavior.

### Write options

| Key | Options | Default | Description |
|------------------------------------------|-----------------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
| Key | Options | Default | Description |
|------------------------------------------|------------------------------------|----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|
| `write.parquet.compression-codec` | `{uncompressed,zstd,gzip,snappy}` | zstd | Sets the Parquet compression coddec. |
| `write.parquet.compression-level` | Integer | null | Parquet compression level for the codec. If not set, it is up to PyIceberg |
| `write.parquet.row-group-limit` | Number of rows | 1048576 | The upper bound of the number of entries within a single row group |
| `write.parquet.page-size-bytes` | Size in bytes | 1MB | Set a target threshold for the approximate encoded size of data pages within a column chunk |
| `write.parquet.page-row-limit` | Number of rows | 20000 | Set a target threshold for the maximum number of rows within a column chunk |
| `write.parquet.dict-size-bytes` | Size in bytes | 2MB | Set the dictionary page size limit per row group |
| `write.metadata.previous-versions-max` | Integer | 100 | The max number of previous version metadata files to keep before deleting after commit. |
| `write.object-storage.enabled` | Boolean | True | Enables the [`ObjectStoreLocationProvider`](configuration.md#object-store-location-provider) that adds a hash component to file paths. Note: the default value of `True` differs from Iceberg's Java implementation |
| `write.object-storage.partitioned-paths` | Boolean | True | Controls whether [partition values are included in file paths](configuration.md#partition-exclusion) when object storage is enabled |
| `write.py-location-provider.impl` | String of form `module.ClassName` | null | Optional, [custom `LocationProvider`](configuration.md#loading-a-custom-location-provider) implementation |
| `write.data.path` | String pointing to location | `{metadata.location}/data` | Sets the location under which data is written. |

### Table behavior options

Expand Down Expand Up @@ -210,8 +211,8 @@ file paths that are optimized for object storage.

### Simple Location Provider

The `SimpleLocationProvider` places a table's file names underneath a `data` directory in the table's base storage
location (this is `table.metadata.location` - see the [Iceberg table specification](https://iceberg.apache.org/spec/#table-metadata)).
The `SimpleLocationProvider` provides paths prefixed by `{location}/data/`, where `location` comes from the [table metadata](https://iceberg.apache.org/spec/#table-metadata-fields). This can be overridden by setting [`write.data.path` table configuration](#write-options).

For example, a non-partitioned table might have a data file with location:

```txt
Expand Down Expand Up @@ -239,9 +240,9 @@ When several files are stored under the same prefix, cloud object stores such as
resulting in slowdowns. The `ObjectStoreLocationProvider` counteracts this by injecting deterministic hashes, in the form of binary directories,
into file paths, to distribute files across a larger number of object store prefixes.

Paths still contain partitions just before the file name, in Hive-style, and a `data` directory beneath the table's location,
in a similar manner to the [`SimpleLocationProvider`](configuration.md#simple-location-provider). For example, a table
partitioned over a string column `category` might have a data file with location: (note the additional binary directories)
Paths are prefixed by `{location}/data/`, where `location` comes from the [table metadata](https://iceberg.apache.org/spec/#table-metadata-fields), in a similar manner to the [`SimpleLocationProvider`](configuration.md#simple-location-provider). This can be overridden by setting [`write.data.path` table configuration](#write-options).

For example, a table partitioned over a string column `category` might have a data file with location: (note the additional binary directories)

```txt
s3://bucket/ns/table/data/0101/0110/1001/10110010/category=orders/0000-0-5affc076-96a4-48f2-9cd2-d5efbc9f0c94-00001.parquet
Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ class TableProperties:
WRITE_OBJECT_STORE_PARTITIONED_PATHS = "write.object-storage.partitioned-paths"
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True

WRITE_DATA_PATH = "write.data.path"

DELETE_MODE = "write.delete.mode"
DELETE_MODE_COPY_ON_WRITE = "copy-on-write"
DELETE_MODE_MERGE_ON_READ = "merge-on-read"
Expand Down
19 changes: 14 additions & 5 deletions pyiceberg/table/locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,17 @@ class LocationProvider(ABC):
table_location: str
table_properties: Properties

data_path: str

def __init__(self, table_location: str, table_properties: Properties):
self.table_location = table_location
self.table_properties = table_properties

if path := table_properties.get(TableProperties.WRITE_DATA_PATH):
self.data_path = path.rstrip("/")
else:
self.data_path = f"{self.table_location.rstrip('/')}/data"

@abstractmethod
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
"""Return a fully-qualified data file location for the given filename.
Expand All @@ -62,8 +69,11 @@ def __init__(self, table_location: str, table_properties: Properties):
super().__init__(table_location, table_properties)

def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str:
prefix = f"{self.table_location}/data"
return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}"
return (
f"{self.data_path}/{partition_key.to_path()}/{data_file_name}"
if partition_key
else f"{self.data_path}/{data_file_name}"
)


class ObjectStoreLocationProvider(LocationProvider):
Expand All @@ -85,13 +95,12 @@ def new_data_location(self, data_file_name: str, partition_key: Optional[Partiti
if self._include_partition_paths and partition_key:
return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}")

prefix = f"{self.table_location}/data"
hashed_path = self._compute_hash(data_file_name)

return (
f"{prefix}/{hashed_path}/{data_file_name}"
f"{self.data_path}/{hashed_path}/{data_file_name}"
if self._include_partition_paths
else f"{prefix}/{hashed_path}-{data_file_name}"
else f"{self.data_path}/{hashed_path}-{data_file_name}"
)

@staticmethod
Expand Down
24 changes: 24 additions & 0 deletions tests/table/test_locations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.locations import LocationProvider, load_location_provider
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT
Expand Down Expand Up @@ -133,3 +134,26 @@ def test_hash_injection(data_file_name: str, expected_hash: str) -> None:
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT)

assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}"


def test_object_location_provider_write_data_path() -> None:
provider = load_location_provider(
table_location="s3://table-location/table",
table_properties={TableProperties.WRITE_DATA_PATH: "s3://table-location/custom/data/path"},
)

assert (
provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/0010/1111/0101/11011101/file.parquet"
)


def test_simple_location_provider_write_data_path() -> None:
provider = load_location_provider(
table_location="table_location",
table_properties={
TableProperties.WRITE_DATA_PATH: "s3://table-location/custom/data/path",
"write.object-storage.enabled": "false",
},
)

assert provider.new_data_location("file.parquet") == "s3://table-location/custom/data/path/file.parquet"

0 comments on commit 7808121

Please sign in to comment.