-
Notifications
You must be signed in to change notification settings - Fork 191
/
Copy pathproducer_consumer.exs
66 lines (54 loc) · 1.54 KB
/
producer_consumer.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Usage: mix run examples/producer_consumer.exs
#
# Hit Ctrl+C twice to stop it.
#
# This is a base example where a producer A emits items,
# which are amplified by a producer consumer B and printed
# by consumer C.
defmodule A do
use GenStage
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
# If the counter is 3 and we ask for 2 items, we will
# emit the items 3 and 4, and set the state to 5.
events = Enum.to_list(counter..counter+demand-1)
{:noreply, events, counter + demand}
end
end
defmodule B do
use GenStage
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
# If we receive [0, 1, 2], the number is 2, this will transform
# it into [0, 1, 2, 1, 2, 3, 2, 3, 4].
events =
for event <- events,
entry <- event..event+number,
do: entry
{:noreply, events, number}
end
end
defmodule C do
use GenStage
def init(:ok) do
{:consumer, :the_state_does_not_matter}
end
def handle_events(events, _from, state) do
# Wait for a second.
:timer.sleep(1000)
# Inspect the events.
IO.inspect(events)
# We are a consumer, so we would never emit items.
{:noreply, [], state}
end
end
{:ok, a} = GenStage.start_link(A, 0) # starting from zero
{:ok, b} = GenStage.start_link(B, 2) # expand by 2
{:ok, c} = GenStage.start_link(C, :ok) # state does not matter
GenStage.sync_subscribe(b, to: a)
GenStage.sync_subscribe(c, to: b)
Process.sleep(:infinity)