Skip to content

Commit

Permalink
Document compact lifecycle method (#29466)
Browse files Browse the repository at this point in the history
* Update programming-guide.md

* Update python example

* Update go compact example

* Add note

* Wording/link

* Fix function name
  • Loading branch information
damccorm authored Nov 17, 2023
1 parent 65ddc34 commit bb310e0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
5 changes: 5 additions & 0 deletions sdks/go/examples/snippets/04transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ func (fn *averageFn) ExtractOutput(a averageAccum) float64 {
return float64(a.Sum) / float64(a.Count)
}

func (fn *averageFn) Compact(a averageAccum) averageAccum {
// No-op
return a
}

func init() {
register.Combiner3[averageAccum, int, float64](&averageFn{})
}
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,10 @@ def extract_output(self, sum_count):
(sum, count) = sum_count
return sum / count if count else float('NaN')

def compact(self, accumulator):
# No-op
return accumulator

# [END combine_custom_average_define]
# [START combine_custom_average_execute]
average = pc | beam.CombineGlobally(AverageFn())
Expand Down
23 changes: 17 additions & 6 deletions website/www/site/content/en/documentation/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1598,10 +1598,12 @@ You should use a `CombineFn` if the combine function requires a more sophisticat
accumulator, must perform additional pre- or post-processing, might change the
output type, or takes the key into account.

A general combining operation consists of four operations. When you create a
A general combining operation consists of five operations. When you create a
<span class="language-java language-py">subclass of</span>
`CombineFn`, you must provide four operations by overriding the
corresponding methods:
`CombineFn`, you must provide five operations by overriding the
corresponding methods. Only `MergeAccumulators` is a required method. The
others will have a default interpretation based on the accumulator type. The
lifecycle methods are:

1. **Create Accumulator** creates a new "local" accumulator. In the example
case, taking a mean average, a local accumulator tracks the running sum of
Expand All @@ -1623,6 +1625,14 @@ corresponding methods:
mean average, this means dividing the combined sum of all the values by the
number of values summed. It is called once on the final, merged accumulator.

5. **Compact** returns a more compact represenation of the accumulator. This is
called before an accumulator is sent across the wire, and can be useful in
cases where values are buffered or otherwise lazily kept unprocessed when
added to the accumulator. Compact should return an equivalent, though
possibly modified, accumulator. In most cases, Compact is not necessary. For
a real world example of using Compact, see the Python SDK implementation of
[TopCombineFn](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/combiners.py#L523)

The following example code shows how to define a `CombineFn` that computes a
mean average:

Expand Down Expand Up @@ -1657,6 +1667,10 @@ public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
public Double extractOutput(Accum accum) {
return ((double) accum.sum) / accum.count;
}

// No-op
@Override
public Accum compact(Accum accum) { return accum; }
}
{{< /highlight >}}

Expand All @@ -1675,9 +1689,6 @@ pc = ...

<span class="language-go">

> **Note**: Only `MergeAccumulators` is a required method. The others will have a default interpretation
> based on the accumulator type.
</span>

##### 4.2.4.3. Combining a PCollection into a single value {#combining-pcollection}
Expand Down

0 comments on commit bb310e0

Please sign in to comment.