Skip to content

Commit

Permalink
Implementing review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
alxmrs committed Jan 30, 2025
1 parent 7e8b61f commit 67272cb
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
* Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651))
* Support the AnyOf Environment for execution in Prism ([#33705](https://github.com/apache/beam/pull/33705))
* This improves support for developing Xlang pipelines, when using a compatible cross language service.
* Partitions are now configurable for the DaskRunner in the Python SDK ([#33805](https://github.com/apache/beam/pull/33805)).

## Breaking Changes

Expand Down
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/runners/dask/transform_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import abc
import dask.bag as db
import logging
import math
import typing as t

Expand Down Expand Up @@ -53,6 +54,7 @@
# Value types for PCollections (possibly Windowed Values).
PCollVal = t.Union[WindowedValue, t.Any]

_LOGGER = logging.getLogger(__name__)

def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue:
"""Wraps a value (item) inside a Window."""
Expand Down Expand Up @@ -168,7 +170,11 @@ def apply(self, input_bag: OpInput, side_inputs: OpSide = None) -> db.Bag:
# Ideal "chunk sizes" in dask are around 10-100 MBs.
# Let's hope ~128 items per partition is around this
# memory overhead.
partition_size = max(128, math.ceil(math.sqrt(len(items)) / 10))
default_size = 128
partition_size = max(default_size, math.ceil(math.sqrt(len(items)) / 10))
if partition_size == default_size:
_LOGGER.warning('The new default partition size is %d, it used to be 1 '
'in previous DaskRunner versions.' % default_size)

return db.from_sequence(
items, npartitions=npartitions, partition_size=partition_size)
Expand Down

0 comments on commit 67272cb

Please sign in to comment.