Skip to content
This repository has been archived by the owner on Jun 30, 2022. It is now read-only.

Commit

Permalink
Implement and use WindowedValue.with_value
Browse files Browse the repository at this point in the history
This allows fewer operations to care about the internal
implementation details of WindowedValue (which will get
more complex over time when we add details like PaneInfo
and retractions).

Also, we spend a significant amount of time creating
WindowedValue objects, refactoring in this way will
allow us to nearly eliminate this with a fast Cython
implementation.
----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=118702433
  • Loading branch information
robertwb authored and silviulica committed Apr 1, 2016
1 parent 345228a commit 06dc6bf
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 58 deletions.
3 changes: 1 addition & 2 deletions google/cloud/dataflow/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ def _process_outputs(self, element, results):
result.value, result.timestamp,
self.window_fn.assign(assign_context))
else:
windowed_value = WindowedValue(
result, element.timestamp, element.windows)
windowed_value = element.with_value(result)
if tag is None:
self.main_receivers.output(windowed_value)
else:
Expand Down
10 changes: 4 additions & 6 deletions google/cloud/dataflow/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -972,16 +972,14 @@ def process(self, context):
driver = create_trigger_driver(self.windowing, True)
state = InMemoryUnmergedState()
# TODO(robertwb): Conditionally process in smaller chunks.
for out_window, values, timestamp in (
driver.process_elements(state, vs, MIN_TIMESTAMP)):
yield window.WindowedValue((k, values), timestamp, [out_window])
for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP):
yield wvalue.with_value((k, wvalue.value))
while state.timers:
fired = state.get_and_clear_timers()
for timer_window, (name, time_domain, fire_time) in fired:
for out_window, values, timestamp in driver.process_timer(
for wvalue in driver.process_timer(
timer_window, name, time_domain, fire_time, state):
yield window.WindowedValue(
(k, values), out_window.end, [out_window])
yield wvalue.with_value((k, wvalue.value))

def apply(self, pcoll):
# This code path is only used in the local direct runner. For Dataflow
Expand Down
14 changes: 8 additions & 6 deletions google/cloud/dataflow/transforms/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from google.cloud.dataflow.transforms.timeutil import TimeDomain
from google.cloud.dataflow.transforms.window import GlobalWindow
from google.cloud.dataflow.transforms.window import OutputTimeFn
from google.cloud.dataflow.transforms.window import WindowedValue
from google.cloud.dataflow.transforms.window import WindowFn


Expand Down Expand Up @@ -711,6 +712,7 @@ def process_timer(self, window_id, name, time_domain, timestamp, state):
class DefaultGlobalBatchTriggerDriver(TriggerDriver):
"""Breaks a bundles into window (pane)s according to the default triggering.
"""
GLOBAL_WINDOW_TUPLE = (GlobalWindow(),)

def __init__(self):
pass
Expand All @@ -725,7 +727,7 @@ def __iter__(self):
def __repr__(self):
return '<UnwindowedValues of %s>' % windowed_values
unwindowed = UnwindowedValues()
yield GlobalWindow(), unwindowed, MIN_TIMESTAMP
yield WindowedValue(unwindowed, MIN_TIMESTAMP, self.GLOBAL_WINDOW_TUPLE)

def process_timer(self, window_id, name, time_domain, timestamp, state):
raise TypeError('Triggers never set or called for batch default windowing.')
Expand All @@ -741,14 +743,14 @@ def __init__(self, phased_combine_fn, underlying):
def process_elements(self, state, windowed_values, output_watermark):
uncombined = self.underlying.process_elements(state, windowed_values,
output_watermark)
for window, unwindowed, timestamp in uncombined:
yield window, self.phased_combine_fn.apply(unwindowed), timestamp
for output in uncombined:
yield output.with_value(self.phased_combine_fn.apply(output.value))

def process_timer(self, window_id, name, time_domain, timestamp, state):
uncombined = self.underlying.process_timer(window_id, name, time_domain,
timestamp, state)
for window, unwindowed in uncombined:
yield window, self.phased_combine_fn.apply(unwindowed)
for output in uncombined:
yield output.with_value(self.phased_combine_fn.apply(output.value))


class GeneralTriggerDriver(TriggerDriver):
Expand Down Expand Up @@ -870,7 +872,7 @@ def _output(self, window, finished, state):
else:
state.clear_state(window, self.WATERMARK_HOLD)

return window, values, timestamp
return WindowedValue(values, timestamp, (window,))


class InMemoryUnmergedState(UnmergedState):
Expand Down
38 changes: 21 additions & 17 deletions google/cloud/dataflow/transforms/trigger_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,30 @@ def run_trigger(self, window_fn, trigger_fn, accumulation_mode,
state = InMemoryUnmergedState()

for bundle in bundles:
output = driver.process_elements(state, bundle, MIN_TIMESTAMP)
for out_window, values, unused_timestamp in output:
actual_panes[out_window].append(set(values))
for wvalue in driver.process_elements(state, bundle, MIN_TIMESTAMP):
window, = wvalue.windows
actual_panes[window].append(set(wvalue.value))

while state.timers:
for timer_window, (name, time_domain, timestamp) in (
state.get_and_clear_timers()):
for out_window, values, unused_timestamp in driver.process_timer(
for wvalue in driver.process_timer(
timer_window, name, time_domain, timestamp, state):
actual_panes[out_window].append(set(values))
window, = wvalue.windows
actual_panes[window].append(set(wvalue.value))

for bundle in late_bundles:
output = driver.process_elements(state, bundle, MIN_TIMESTAMP)
for out_window, values, unused_timestamp in output:
actual_panes[out_window].append(set(values))
for wvalue in driver.process_elements(state, bundle, MIN_TIMESTAMP):
window, = wvalue.windows
actual_panes[window].append(set(wvalue.value))

while state.timers:
for timer_window, (name, time_domain, timestamp) in (
state.get_and_clear_timers()):
for out_window, values, unused_timestamp in driver.process_timer(
for wvalue in driver.process_timer(
timer_window, name, time_domain, timestamp, state):
actual_panes[out_window].append(set(values))
window, = wvalue.windows
actual_panes[window].append(set(wvalue.value))

self.assertEqual(expected_panes, actual_panes)

Expand Down Expand Up @@ -500,11 +502,12 @@ def fire_timers():
to_fire = state.get_and_clear_timers(watermark)
while to_fire:
for timer_window, (name, time_domain, t_timestamp) in to_fire:
for window, values, timestamp in driver.process_timer(
for wvalue in driver.process_timer(
timer_window, name, time_domain, t_timestamp, state):
window, = wvalue.windows
output.append({'window': [window.start, window.end - 1],
'values': sorted(values),
'timestamp': timestamp})
'values': sorted(wvalue.value),
'timestamp': wvalue.timestamp})
to_fire = state.get_and_clear_timers(watermark)

for line in spec['transcript']:
Expand All @@ -520,10 +523,11 @@ def fire_timers():
bundle = [
WindowedValue(t, t, window_fn.assign(WindowFn.AssignContext(t, t)))
for t in params]
output = [{'window': [window.start, window.end - 1],
'values': sorted(values),
'timestamp': timestamp}
for window, values, timestamp
output = [{'window': [wvalue.windows[0].start,
wvalue.windows[0].end - 1],
'values': sorted(wvalue.value),
'timestamp': wvalue.timestamp}
for wvalue
in driver.process_elements(state, bundle, watermark)]
fire_timers()

Expand Down
3 changes: 3 additions & 0 deletions google/cloud/dataflow/transforms/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ def __eq__(self, other):
and self.timestamp == other.timestamp
and self.windows == other.windows)

def with_value(self, new_value):
return WindowedValue(new_value, self.timestamp, self.windows)


class TimestampedValue(object):
"""A timestamped value having a value and a timestamp.
Expand Down
42 changes: 15 additions & 27 deletions google/cloud/dataflow/worker/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import google.cloud.dataflow.transforms as ptransform
from google.cloud.dataflow.transforms import combiners
from google.cloud.dataflow.transforms import trigger
from google.cloud.dataflow.transforms import window
from google.cloud.dataflow.transforms.combiners import curry_combine_fn
from google.cloud.dataflow.transforms.combiners import PhasedCombineFnExecutor
from google.cloud.dataflow.transforms.trigger import InMemoryUnmergedState
Expand Down Expand Up @@ -501,9 +500,8 @@ def process(self, o):
logging.debug('Processing [%s] in %s', o, self)
assert isinstance(o, WindowedValue)
key, values = o.value
windowed_value = WindowedValue(
(key, self.phased_combine_fn.apply(values)), o.timestamp, o.windows)
self.output(windowed_value)
self.output(
o.with_value((key, self.phased_combine_fn.apply(values))))


def create_pgbk_op(spec):
Expand Down Expand Up @@ -633,10 +631,7 @@ def process(self, o):
logging.debug('Processing [%s] in %s', o, self)
assert isinstance(o, WindowedValue)
k, v = o.value
self.output(
window.WindowedValue(
(k, window.WindowedValue(v, o.timestamp, o.windows)),
o.timestamp, o.windows))
self.output(o.with_value((k, o.with_value(v))))


class BatchGroupAlsoByWindowsOperation(Operation):
Expand Down Expand Up @@ -669,19 +664,15 @@ def process(self, o):
state = InMemoryUnmergedState()

# TODO(robertwb): Process in smaller chunks.
for out_window, values, timestamp in (
driver.process_elements(state, vs, MIN_TIMESTAMP)):
self.output(
window.WindowedValue((k, values), timestamp, [out_window]))
for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP):
self.output(wvalue.with_value((k, wvalue.value)))

while state.timers:
timers = state.get_and_clear_timers()
for timer_window, (name, time_domain, timestamp) in timers:
for out_window, values, timestamp in (
driver.process_timer(timer_window, name, time_domain, timestamp,
state)):
self.output(
window.WindowedValue((k, values), timestamp, [out_window]))
for wvalue in driver.process_timer(
timer_window, name, time_domain, timestamp, state):
self.output(wvalue.with_value((k, wvalue.value)))


class StreamingGroupAlsoByWindowsOperation(Operation):
Expand All @@ -703,19 +694,16 @@ def process(self, o):
state = self.spec.context.state
output_watermark = self.spec.context.output_data_watermark

for out_window, values, timestamp in (
driver.process_elements(state, keyed_work.elements(),
output_watermark)):
self.output(window.WindowedValue((keyed_work.key, values), timestamp,
[out_window]))
key = keyed_work.key
for wvalue in driver.process_elements(
state, keyed_work.elements(), output_watermark):
self.output(wvalue.with_value((key, wvalue.value)))

for timer in keyed_work.timers():
timer_window = int(timer.namespace)
for out_window, values, timestamp in (
driver.process_timer(timer_window, timer.name, timer.time_domain,
timer.timestamp, state)):
self.output(window.WindowedValue((keyed_work.key, values), timestamp,
[out_window]))
for wvalue in driver.process_timer(
timer_window, timer.name, timer.time_domain, timer.timestamp, state):
self.output(wvalue.with_value((key, wvalue.value)))


class MapTaskExecutor(object):
Expand Down

0 comments on commit 06dc6bf

Please sign in to comment.