From 30d08636b4d36728fbbfce3d3d964b34679c9db9 Mon Sep 17 00:00:00 2001 From: Jozef Vilcek Date: Tue, 7 Jan 2025 13:16:08 +0100 Subject: [PATCH 1/5] [spark] Restore memory sensitive GBK translation --- .../translation/TransformTranslator.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 6ab4f79787eb..7b1818399268 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -146,32 +146,36 @@ public void evaluate(GroupByKey transform, EvaluationContext context) { JavaRDD>>> groupedByKey; Partitioner partitioner = getPartitioner(context); - // As this is batch, we can ignore triggering and allowed lateness parameters. - if (windowingStrategy.getWindowFn().equals(new GlobalWindows()) - && windowingStrategy.getTimestampCombiner().equals(TimestampCombiner.END_OF_WINDOW)) { - // we can drop the windows and recover them later - groupedByKey = - GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow( - inRDD, keyCoder, coder.getValueCoder(), partitioner); - } else if (GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) { + if (GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) { // we can have a memory sensitive translation for non-merging windows groupedByKey = - GroupNonMergingWindowsFunctions.groupByKeyAndWindow( - inRDD, keyCoder, coder.getValueCoder(), windowingStrategy, partitioner); + GroupNonMergingWindowsFunctions.groupByKeyAndWindow( + inRDD, keyCoder, coder.getValueCoder(), windowingStrategy, partitioner); } else { - // --- group by key only. - JavaRDD>>> groupedByKeyOnly = - GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, partitioner); - // --- now group also by window. - // for batch, GroupAlsoByWindow uses an in-memory StateInternals. - groupedByKey = - groupedByKeyOnly.flatMap( - new SparkGroupAlsoByWindowViaOutputBufferFn<>( - windowingStrategy, - new TranslationUtils.InMemoryStateInternalsFactory<>(), - SystemReduceFn.buffering(coder.getValueCoder()), - context.getSerializableOptions())); + // As this is batch, we can ignore triggering and allowed lateness parameters. + if (windowingStrategy.getWindowFn().equals(new GlobalWindows()) + && windowingStrategy.getTimestampCombiner().equals(TimestampCombiner.END_OF_WINDOW)) { + + // we can drop the windows and recover them later + groupedByKey = + GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow( + inRDD, keyCoder, coder.getValueCoder(), partitioner); + } else { + // --- group by key only. + JavaRDD>>> groupedByKeyOnly = + GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, partitioner); + + // --- now group also by window. + // for batch, GroupAlsoByWindow uses an in-memory StateInternals. + groupedByKey = + groupedByKeyOnly.flatMap( + new SparkGroupAlsoByWindowViaOutputBufferFn<>( + windowingStrategy, + new TranslationUtils.InMemoryStateInternalsFactory<>(), + SystemReduceFn.buffering(coder.getValueCoder()), + context.getSerializableOptions())); + } } context.putDataset(transform, new BoundedDataset<>(groupedByKey)); } From 231aa0275cf27097c8ad390cb872a980def7b59b Mon Sep 17 00:00:00 2001 From: Jozef Vilcek Date: Tue, 7 Jan 2025 13:35:45 +0100 Subject: [PATCH 2/5] [spark] spotless --- .../translation/TransformTranslator.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 7b1818399268..99873887ecd1 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -149,32 +149,32 @@ public void evaluate(GroupByKey transform, EvaluationContext context) { if (GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) { // we can have a memory sensitive translation for non-merging windows groupedByKey = - GroupNonMergingWindowsFunctions.groupByKeyAndWindow( - inRDD, keyCoder, coder.getValueCoder(), windowingStrategy, partitioner); + GroupNonMergingWindowsFunctions.groupByKeyAndWindow( + inRDD, keyCoder, coder.getValueCoder(), windowingStrategy, partitioner); } else { // As this is batch, we can ignore triggering and allowed lateness parameters. if (windowingStrategy.getWindowFn().equals(new GlobalWindows()) - && windowingStrategy.getTimestampCombiner().equals(TimestampCombiner.END_OF_WINDOW)) { + && windowingStrategy.getTimestampCombiner().equals(TimestampCombiner.END_OF_WINDOW)) { // we can drop the windows and recover them later groupedByKey = - GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow( - inRDD, keyCoder, coder.getValueCoder(), partitioner); + GroupNonMergingWindowsFunctions.groupByKeyInGlobalWindow( + inRDD, keyCoder, coder.getValueCoder(), partitioner); } else { // --- group by key only. JavaRDD>>> groupedByKeyOnly = - GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, partitioner); + GroupCombineFunctions.groupByKeyOnly(inRDD, keyCoder, wvCoder, partitioner); // --- now group also by window. // for batch, GroupAlsoByWindow uses an in-memory StateInternals. groupedByKey = - groupedByKeyOnly.flatMap( - new SparkGroupAlsoByWindowViaOutputBufferFn<>( - windowingStrategy, - new TranslationUtils.InMemoryStateInternalsFactory<>(), - SystemReduceFn.buffering(coder.getValueCoder()), - context.getSerializableOptions())); + groupedByKeyOnly.flatMap( + new SparkGroupAlsoByWindowViaOutputBufferFn<>( + windowingStrategy, + new TranslationUtils.InMemoryStateInternalsFactory<>(), + SystemReduceFn.buffering(coder.getValueCoder()), + context.getSerializableOptions())); } } context.putDataset(transform, new BoundedDataset<>(groupedByKey)); From e74b4e7bb0e1b12c08330b9ea972ae5969ca5f69 Mon Sep 17 00:00:00 2001 From: Jozef Vilcek Date: Mon, 13 Jan 2025 15:36:13 +0100 Subject: [PATCH 3/5] [spark] Exclude CoGBK transform from group by key and window optimisation --- .../beam/runners/spark/SparkRunner.java | 4 + .../spark/translation/EvaluationContext.java | 24 ++++++ .../spark/translation/GroupByKeyVisitor.java | 77 +++++++++++++++++++ .../translation/TransformTranslator.java | 3 +- .../translation/GroupByKeyVisitorTest.java | 71 +++++++++++++++++ 5 files changed, 178 insertions(+), 1 deletion(-) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupByKeyVisitor.java create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupByKeyVisitorTest.java diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 034e0632df49..2b72ffb0f225 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.metrics.SparkBeamMetricSource; import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.GroupByKeyVisitor; import org.apache.beam.runners.spark.translation.SparkContextFactory; import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; import org.apache.beam.runners.spark.translation.TransformEvaluator; @@ -214,6 +215,9 @@ public SparkPipelineResult run(final Pipeline pipeline) { // update the cache candidates updateCacheCandidates(pipeline, translator, evaluationContext); + // update GBK candidates for memory optimized transform + pipeline.traverseTopologically(new GroupByKeyVisitor(translator, evaluationContext)); + initAccumulators(pipelineOptions, jsc); startPipeline = executorService.submit( diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 51fa457e36dc..e43af9a7ebdc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -65,6 +65,8 @@ public class EvaluationContext { private AppliedPTransform currentTransform; private final SparkPCollectionView pviews = new SparkPCollectionView(); private final Map cacheCandidates = new HashMap<>(); + private final Map, String> groupByKeyCandidatesForMemoryOptimizedTranslation = + new HashMap<>(); private final PipelineOptions options; private final SerializablePipelineOptions serializableOptions; @@ -282,6 +284,28 @@ public Map getCacheCandidates() { return this.cacheCandidates; } + /** + * Get the map of GBK transforms to their full names, which are candidates for + * group by key and window translation which aims to reduce memory usage. + * + * @return The current {@link Map} of candidates + */ + public Map, String> getCandidatesForGroupByKeyAndWindowTranslation() { + return this.groupByKeyCandidatesForMemoryOptimizedTranslation; + } + + /** + * Returns if given GBK transform can be considered as candidate for group by key and window + * translation aiming to reduce memory usage. + * @param transform to evaluate + * @return true if given transform is a candidate; false otherwise + * @param type of GBK key + * @param type of GBK value + */ + public boolean isCandidateForGroupByKeyAndWindow(GroupByKey transform) { + return groupByKeyCandidatesForMemoryOptimizedTranslation.containsKey(transform); + } + Iterable> getWindowedValues(PCollection pcollection) { @SuppressWarnings("unchecked") BoundedDataset boundedDataset = (BoundedDataset) datasets.get(pcollection); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupByKeyVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupByKeyVisitor.java new file mode 100644 index 000000000000..cc96cfb60585 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupByKeyVisitor.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.util.construction.PTransformTranslation; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; + +/** Traverses the pipeline to populate the candidates for group by key. */ +public class GroupByKeyVisitor extends Pipeline.PipelineVisitor.Defaults { + + protected final EvaluationContext ctxt; + protected final SparkPipelineTranslator translator; + private boolean isInsideCoGBK = false; + private long visitedGroupByKeyTransformsCount = 0; + + public GroupByKeyVisitor( + SparkPipelineTranslator translator, EvaluationContext evaluationContext) { + this.ctxt = evaluationContext; + this.translator = translator; + } + + @Override + public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform( + TransformHierarchy.Node node) { + if (node.getTransform() != null && node.getTransform() instanceof CoGroupByKey) { + isInsideCoGBK = true; + } + return CompositeBehavior.ENTER_TRANSFORM; + } + + @Override + public void leaveCompositeTransform(TransformHierarchy.Node node) { + if (isInsideCoGBK && node.getTransform() instanceof CoGroupByKey) { + isInsideCoGBK = false; + } + } + + @Override + public void visitPrimitiveTransform(TransformHierarchy.Node node) { + PTransform transform = node.getTransform(); + if (transform != null) { + String urn = PTransformTranslation.urnForTransformOrNull(transform); + if (PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN.equals(urn)) { + visitedGroupByKeyTransformsCount += 1; + if (!isInsideCoGBK) { + ctxt.getCandidatesForGroupByKeyAndWindowTranslation() + .put((GroupByKey) transform, node.getFullName()); + } + } + } + } + + @VisibleForTesting + long getVisitedGroupByKeyTransformsCount() { + return visitedGroupByKeyTransformsCount; + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 99873887ecd1..614f1f96c3e8 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -146,7 +146,8 @@ public void evaluate(GroupByKey transform, EvaluationContext context) { JavaRDD>>> groupedByKey; Partitioner partitioner = getPartitioner(context); - if (GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) { + if (context.isCandidateForGroupByKeyAndWindow(transform) + && GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) { // we can have a memory sensitive translation for non-merging windows groupedByKey = GroupNonMergingWindowsFunctions.groupByKeyAndWindow( diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupByKeyVisitorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupByKeyVisitorTest.java new file mode 100644 index 000000000000..f3875c7d1cb2 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/GroupByKeyVisitorTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; + +import org.apache.beam.runners.spark.SparkContextRule; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; +import org.junit.ClassRule; +import org.junit.Test; + +/** Tests of {@link GroupByKeyVisitor}}. */ +public class GroupByKeyVisitorTest { + + @ClassRule public static SparkContextRule contextRule = new SparkContextRule(); + + @Test + public void testTraverseShouldPopulateCandidatesIntoEvaluationContext() { + SparkPipelineOptions options = contextRule.createPipelineOptions(); + Pipeline pipeline = Pipeline.create(options); + PCollection> pCollection = + pipeline.apply(Create.of(KV.of(3, "foo"), KV.of(3, "bar"))); + + pCollection.apply("CandidateGBK_1", Reshuffle.viaRandomKey()); + pCollection.apply("CandidateGBK_2", GroupByKey.create()); + + final TupleTag t1 = new TupleTag<>(); + final TupleTag t2 = new TupleTag<>(); + + KeyedPCollectionTuple.of(t1, pCollection) + .and(t2, pCollection) + .apply("GBK_inside_CoGBK_ignored", CoGroupByKey.create()); + + EvaluationContext ctxt = + new EvaluationContext(contextRule.getSparkContext(), pipeline, options); + GroupByKeyVisitor visitor = new GroupByKeyVisitor(new TransformTranslator.Translator(), ctxt); + pipeline.traverseTopologically(visitor); + + assertEquals(3, visitor.getVisitedGroupByKeyTransformsCount()); + assertEquals(2, ctxt.getCandidatesForGroupByKeyAndWindowTranslation().size()); + assertThat( + ctxt.getCandidatesForGroupByKeyAndWindowTranslation().values(), + containsInAnyOrder("CandidateGBK_1/Reshuffle/GroupByKey", "CandidateGBK_2")); + } +} From e551b4cfae95b25f0b4fa1d2257fff938d25bda1 Mon Sep 17 00:00:00 2001 From: Jozef Vilcek Date: Tue, 14 Jan 2025 08:21:26 +0100 Subject: [PATCH 4/5] [spark] spotless --- .../beam/runners/spark/translation/EvaluationContext.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index e43af9a7ebdc..1f44a29002ef 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -285,8 +285,8 @@ public Map getCacheCandidates() { } /** - * Get the map of GBK transforms to their full names, which are candidates for - * group by key and window translation which aims to reduce memory usage. + * Get the map of GBK transforms to their full names, which are candidates for group by key and + * window translation which aims to reduce memory usage. * * @return The current {@link Map} of candidates */ @@ -297,6 +297,7 @@ public Map getCacheCandidates() { /** * Returns if given GBK transform can be considered as candidate for group by key and window * translation aiming to reduce memory usage. + * * @param transform to evaluate * @return true if given transform is a candidate; false otherwise * @param type of GBK key From 75a880bb40c42808bf5adb9dff3d7950987fb8a3 Mon Sep 17 00:00:00 2001 From: Jozef Vilcek Date: Sun, 26 Jan 2025 15:29:45 +0100 Subject: [PATCH 5/5] [spark] Add config option for enable GBK translation to support huge values --- .../batch/GroupByKeyTranslatorBatch.java | 18 +++++++-------- .../translation/batch/GroupByKeyTest.java | 23 +++++++++++++++++-- .../spark/SparkCommonPipelineOptions.java | 9 ++++++++ .../translation/TransformTranslator.java | 11 +++++++-- 4 files changed, 48 insertions(+), 13 deletions(-) diff --git a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java index 9e75e22a3087..bbda0f0cfb65 100644 --- a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java +++ b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java @@ -49,6 +49,7 @@ import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.spark.SparkCommonPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn; import org.apache.beam.sdk.coders.KvCoder; @@ -84,8 +85,10 @@ * *

Note: Using {@code collect_list} isn't any worse than using {@link ReduceFnRunner}. In the * latter case the entire group (iterator) has to be loaded into memory as well. Either way there's - * a risk of OOM errors. When disabling {@link #useCollectList}, a more memory sensitive iterable is - * used that can be traversed just once. Attempting to traverse the iterable again will throw. + * a risk of OOM errors. When enabling {@link + * SparkCommonPipelineOptions#getPreferGroupByKeyToHandleHugeValues()}, a more memory sensitive + * iterable is used that can be traversed just once. Attempting to traverse the iterable again will + * throw. * *

    *
  • When using the default global window, window information is dropped and restored after the @@ -108,17 +111,10 @@ class GroupByKeyTranslatorBatch private static final List GLOBAL_WINDOW_DETAILS = windowDetails(lit(new byte[][] {EMPTY_BYTE_ARRAY})); - private boolean useCollectList = true; - GroupByKeyTranslatorBatch() { super(0.2f); } - GroupByKeyTranslatorBatch(boolean useCollectList) { - super(0.2f); - this.useCollectList = useCollectList; - } - @Override public void translate(GroupByKey transform, Context cxt) { WindowingStrategy windowing = cxt.getInput().getWindowingStrategy(); @@ -135,6 +131,10 @@ public void translate(GroupByKey transform, Context cxt) { // In batch we can ignore triggering and allowed lateness parameters final Dataset>>> result; + boolean useCollectList = + !cxt.getOptions() + .as(SparkCommonPipelineOptions.class) + .getPreferGroupByKeyToHandleHugeValues(); if (useCollectList && eligibleForGlobalGroupBy(windowing, false)) { // Collects all values per key in memory. This might be problematic if there's few keys only // or some highly skewed distribution. diff --git a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java index bf1799bb13b2..6569b7b20cfc 100644 --- a/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java +++ b/runners/spark/3/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTest.java @@ -26,9 +26,12 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.beam.runners.spark.SparkCommonPipelineOptions; import org.apache.beam.runners.spark.structuredstreaming.SparkSessionRule; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SerializableMatcher; @@ -48,21 +51,37 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; /** Test class for beam to spark {@link ParDo} translation. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class GroupByKeyTest implements Serializable { @ClassRule public static final SparkSessionRule SESSION = new SparkSessionRule(); + @Parameterized.Parameter public boolean preferGroupByKeyToHandleHugeValues; + + @Parameterized.Parameters(name = "Test with preferGroupByKeyToHandleHugeValues={0}") + public static Collection preferGroupByKeyToHandleHugeValues() { + return Arrays.asList(new Object[][] {{true}, {false}}); + } + @Rule public transient TestPipeline pipeline = TestPipeline.fromOptions(SESSION.createPipelineOptions()); + @Before + public void updatePipelineOptions() { + pipeline + .getOptions() + .as(SparkCommonPipelineOptions.class) + .setPreferGroupByKeyToHandleHugeValues(preferGroupByKeyToHandleHugeValues); + } + @Test public void testGroupByKeyPreservesWindowing() { pipeline diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java index 3c91b3547b7f..ac081bd2d1c9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkCommonPipelineOptions.java @@ -63,6 +63,15 @@ public interface SparkCommonPipelineOptions void setEnableSparkMetricSinks(Boolean enableSparkMetricSinks); + @Description( + "When set to true, runner will try to prefer GroupByKey translation which can handle huge values and " + + "does not require them to fit into memory. This will most likely have performance impact " + + "for pipelines which does not work with huge values, hence it is disabled by default.") + @Default.Boolean(false) + Boolean getPreferGroupByKeyToHandleHugeValues(); + + void setPreferGroupByKeyToHandleHugeValues(Boolean preferGroupByKeyToHandleHugeValues); + /** * Returns the default checkpoint directory of /tmp/${job.name}. For testing purposes only. * Production applications should use a reliable filesystem such as HDFS/S3/GS. diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 614f1f96c3e8..ebfcecf030b7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -146,9 +146,16 @@ public void evaluate(GroupByKey transform, EvaluationContext context) { JavaRDD>>> groupedByKey; Partitioner partitioner = getPartitioner(context); - if (context.isCandidateForGroupByKeyAndWindow(transform) + boolean enableHugeValuesTranslation = + context + .getOptions() + .as(SparkPipelineOptions.class) + .getPreferGroupByKeyToHandleHugeValues(); + if (enableHugeValuesTranslation + && context.isCandidateForGroupByKeyAndWindow(transform) && GroupNonMergingWindowsFunctions.isEligibleForGroupByWindow(windowingStrategy)) { - // we can have a memory sensitive translation for non-merging windows + // we prefer memory sensitive translation of GBK which can support large values per + // key and does not require them to fit into memory groupedByKey = GroupNonMergingWindowsFunctions.groupByKeyAndWindow( inRDD, keyCoder, coder.getValueCoder(), windowingStrategy, partitioner);