Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into filterRecordBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjqliu committed Feb 13, 2025
2 parents 42f1856 + 38d57ea commit ed8223b
Show file tree
Hide file tree
Showing 6 changed files with 526 additions and 934 deletions.
21 changes: 14 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@
help: ## Display this help
@awk 'BEGIN {FS = ":.*##"; printf "\nUsage:\n make \033[36m\033[0m\n"} /^[a-zA-Z_-]+:.*?##/ { printf " \033[36m%-20s\033[0m %s\n", $$1, $$2 } /^##@/ { printf "\n\033[1m%s\033[0m\n", substr($$0, 5) } ' $(MAKEFILE_LIST)

install-poetry: ## Install poetry if the user has not done that yet.
@if ! command -v poetry &> /dev/null; then \
echo "Poetry could not be found. Installing..."; \
pip install --user poetry==2.0.1; \
else \
echo "Poetry is already installed."; \
fi
POETRY_VERSION = 2.0.1
install-poetry: ## Ensure Poetry is installed and the correct version is being used.
@if ! command -v poetry &> /dev/null; then \
echo "Poetry could not be found. Installing..."; \
pip install --user poetry==$(POETRY_VERSION); \
else \
INSTALLED_VERSION=$$(pip show poetry | grep Version | awk '{print $$2}'); \
if [ "$$INSTALLED_VERSION" != "$(POETRY_VERSION)" ]; then \
echo "Poetry version $$INSTALLED_VERSION does not match required version $(POETRY_VERSION). Updating..."; \
pip install --user --upgrade poetry==$(POETRY_VERSION); \
else \
echo "Poetry version $$INSTALLED_VERSION is already installed."; \
fi \
fi

install-dependencies: ## Install dependencies including dev, docs, and all extras
poetry install --all-extras
Expand Down
136 changes: 136 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1546,3 +1546,139 @@ df.show(2)

(Showing first 2 rows)
```

### Polars

PyIceberg interfaces closely with Polars Dataframes and LazyFrame which provides a full lazily optimized query engine interface on top of PyIceberg tables.

<!-- prettier-ignore-start -->

!!! note "Requirements"
This requires [`polars` to be installed](index.md).

```python
pip install pyiceberg['polars']
```
<!-- prettier-ignore-end -->

PyIceberg data can be analyzed and accessed through Polars using either DataFrame or LazyFrame.
If your code utilizes the Apache Iceberg data scanning and retrieval API and then analyzes the resulting DataFrame in Polars, use the `table.scan().to_polars()` API.
If the intent is to utilize Polars' high-performance filtering and retrieval functionalities, use LazyFrame exported from the Iceberg table with the `table.to_polars()` API.

```python
# Get LazyFrame
iceberg_table.to_polars()

# Get Data Frame
iceberg_table.scan().to_polars()
```

#### Working with Polars DataFrame

PyIceberg makes it easy to filter out data from a huge table and pull it into a Polars dataframe locally. This will only fetch the relevant Parquet files for the query and apply the filter. This will reduce IO and therefore improve performance and reduce cost.

```python
schema = Schema(
NestedField(field_id=1, name='ticket_id', field_type=LongType(), required=True),
NestedField(field_id=2, name='customer_id', field_type=LongType(), required=True),
NestedField(field_id=3, name='issue', field_type=StringType(), required=False),
NestedField(field_id=4, name='created_at', field_type=TimestampType(), required=True),
required=True
)

iceberg_table = catalog.create_table(
identifier='default.product_support_issues',
schema=schema
)

pa_table_data = pa.Table.from_pylist(
[
{'ticket_id': 1, 'customer_id': 546, 'issue': 'User Login issue', 'created_at': 1650020000000000},
{'ticket_id': 2, 'customer_id': 547, 'issue': 'Payment not going through', 'created_at': 1650028640000000},
{'ticket_id': 3, 'customer_id': 548, 'issue': 'Error on checkout', 'created_at': 1650037280000000},
{'ticket_id': 4, 'customer_id': 549, 'issue': 'Unable to reset password', 'created_at': 1650045920000000},
{'ticket_id': 5, 'customer_id': 550, 'issue': 'Account locked', 'created_at': 1650054560000000},
{'ticket_id': 6, 'customer_id': 551, 'issue': 'Order not received', 'created_at': 1650063200000000},
{'ticket_id': 7, 'customer_id': 552, 'issue': 'Refund not processed', 'created_at': 1650071840000000},
{'ticket_id': 8, 'customer_id': 553, 'issue': 'Shipping address issue', 'created_at': 1650080480000000},
{'ticket_id': 9, 'customer_id': 554, 'issue': 'Product damaged', 'created_at': 1650089120000000},
{'ticket_id': 10, 'customer_id': 555, 'issue': 'Unable to apply discount code', 'created_at': 1650097760000000},
{'ticket_id': 11, 'customer_id': 556, 'issue': 'Website not loading', 'created_at': 1650106400000000},
{'ticket_id': 12, 'customer_id': 557, 'issue': 'Incorrect order received', 'created_at': 1650115040000000},
{'ticket_id': 13, 'customer_id': 558, 'issue': 'Unable to track order', 'created_at': 1650123680000000},
{'ticket_id': 14, 'customer_id': 559, 'issue': 'Order delayed', 'created_at': 1650132320000000},
{'ticket_id': 15, 'customer_id': 560, 'issue': 'Product not as described', 'created_at': 1650140960000000},
{'ticket_id': 16, 'customer_id': 561, 'issue': 'Unable to contact support', 'created_at': 1650149600000000},
{'ticket_id': 17, 'customer_id': 562, 'issue': 'Duplicate charge', 'created_at': 1650158240000000},
{'ticket_id': 18, 'customer_id': 563, 'issue': 'Unable to update profile', 'created_at': 1650166880000000},
{'ticket_id': 19, 'customer_id': 564, 'issue': 'App crashing', 'created_at': 1650175520000000},
{'ticket_id': 20, 'customer_id': 565, 'issue': 'Unable to download invoice', 'created_at': 1650184160000000},
{'ticket_id': 21, 'customer_id': 566, 'issue': 'Incorrect billing amount', 'created_at': 1650192800000000},
], schema=iceberg_table.schema().as_arrow()
)

iceberg_table.append(
df=pa_table_data
)

table.scan(
row_filter="ticket_id > 10",
).to_polars()
```

This will return a Polars DataFrame:

```python
shape: (11, 4)
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
│ ticket_id ┆ customer_id ┆ issue ┆ created_at │
------------
│ i64 ┆ i64 ┆ str ┆ datetime[μs] │
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
11556 ┆ Website not loading ┆ 2022-04-16 10:53:20
12557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20
13558 ┆ Unable to track order ┆ 2022-04-16 15:41:20
14559 ┆ Order delayed ┆ 2022-04-16 18:05:20
15560 ┆ Product not as described ┆ 2022-04-16 20:29:20
│ … ┆ … ┆ … ┆ … │
17562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20
18563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20
19564 ┆ App crashing ┆ 2022-04-17 06:05:20
20565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20
21566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
```

#### Working with Polars LazyFrame

PyIceberg supports creation of a Polars LazyFrame based on an Iceberg Table.

using the above code example:

```python
lf = iceberg_table.to_polars().filter(pl.col("ticket_id") > 10)
print(lf.collect())
```

This above code snippet returns a Polars LazyFrame and defines a filter to be executed by Polars:

```python
shape: (11, 4)
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
│ ticket_id ┆ customer_id ┆ issue ┆ created_at │
------------
│ i64 ┆ i64 ┆ str ┆ datetime[μs] │
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
11556 ┆ Website not loading ┆ 2022-04-16 10:53:20
12557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20
13558 ┆ Unable to track order ┆ 2022-04-16 15:41:20
14559 ┆ Order delayed ┆ 2022-04-16 18:05:20
15560 ┆ Product not as described ┆ 2022-04-16 20:29:20
│ … ┆ … ┆ … ┆ … │
17562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20
18563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20
19564 ┆ App crashing ┆ 2022-04-17 06:05:20
20565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20
21566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
```
7 changes: 4 additions & 3 deletions mkdocs/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ pip install "pyiceberg[s3fs,hive]"

You can mix and match optional dependencies depending on your needs:

| Key | Description: |
|---------------|---------------------------------------------------------------------------|
| hive | Support for the Hive metastore |
| Key | Description: |
| ------------ | ------------------------------------------------------------------------- |
| hive | Support for the Hive metastore |
| hive-kerberos | Support for Hive metastore in Kerberos environment |
| glue | Support for AWS Glue |
| dynamodb | Support for AWS DynamoDB |
Expand All @@ -53,6 +53,7 @@ You can mix and match optional dependencies depending on your needs:
| duckdb | Installs both PyArrow and DuckDB |
| ray | Installs PyArrow, Pandas, and Ray |
| daft | Installs Daft |
| polars | Installs Polars |
| s3fs | S3FS as a FileIO implementation to interact with the object store |
| adlfs | ADLFS as a FileIO implementation to interact with the object store |
| snappy | Support for snappy Avro compression |
Expand Down
Loading

0 comments on commit ed8223b

Please sign in to comment.