From 67272cbf5c3f6c3ebbfd04288f719252c60877f5 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 30 Jan 2025 15:09:00 -0800 Subject: [PATCH] Implementing review feedback. --- CHANGES.md | 1 + .../apache_beam/runners/dask/transform_evaluator.py | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 799d26dc05e3..fde00b9da4ce 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -86,6 +86,7 @@ * Support the Process Environment for execution in Prism ([#33651](https://github.com/apache/beam/pull/33651)) * Support the AnyOf Environment for execution in Prism ([#33705](https://github.com/apache/beam/pull/33705)) * This improves support for developing Xlang pipelines, when using a compatible cross language service. +* Partitions are now configurable for the DaskRunner in the Python SDK ([#33805](https://github.com/apache/beam/pull/33805)). ## Breaking Changes diff --git a/sdks/python/apache_beam/runners/dask/transform_evaluator.py b/sdks/python/apache_beam/runners/dask/transform_evaluator.py index ea982c403551..bde6f328a2ab 100644 --- a/sdks/python/apache_beam/runners/dask/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/dask/transform_evaluator.py @@ -25,6 +25,7 @@ import abc import dask.bag as db +import logging import math import typing as t @@ -53,6 +54,7 @@ # Value types for PCollections (possibly Windowed Values). PCollVal = t.Union[WindowedValue, t.Any] +_LOGGER = logging.getLogger(__name__) def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue: """Wraps a value (item) inside a Window.""" @@ -168,7 +170,11 @@ def apply(self, input_bag: OpInput, side_inputs: OpSide = None) -> db.Bag: # Ideal "chunk sizes" in dask are around 10-100 MBs. # Let's hope ~128 items per partition is around this # memory overhead. - partition_size = max(128, math.ceil(math.sqrt(len(items)) / 10)) + default_size = 128 + partition_size = max(default_size, math.ceil(math.sqrt(len(items)) / 10)) + if partition_size == default_size: + _LOGGER.warning('The new default partition size is %d, it used to be 1 ' + 'in previous DaskRunner versions.' % default_size) return db.from_sequence( items, npartitions=npartitions, partition_size=partition_size)