Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use genstage for consuming publish_mutation broadcast events #6

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
16 changes: 6 additions & 10 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,12 @@ defmodule Absinthe.Subscription do

@doc false
def publish_remote(pubsub, mutation_result, subscribed_fields) do
{:ok, pool_size} =
pubsub
|> registry_name
|> Registry.meta(:pool_size)

shard = :erlang.phash2(mutation_result, pool_size)

proxy_topic = Subscription.Proxy.topic(shard)

:ok = pubsub.publish_mutation(proxy_topic, mutation_result, subscribed_fields)
:ok =
pubsub.publish_mutation(
"__absinthe__:subscription:mutation_published",
mutation_result,
subscribed_fields
)
end

## Middleware callback
Expand Down
106 changes: 106 additions & 0 deletions lib/absinthe/subscription/mutation_publish_listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
defmodule Absinthe.Subscription.MutationPublishListener do
@moduledoc """
GenStage producer that listens for `publish_mutation` broadcasts
and buffers events to be processed by MutationPublishProcessor
"""
use GenStage
require Logger

def start_link([_pubsub, _max_queue_length, name] = args) do
GenStage.start_link(__MODULE__, args, name: name)
end

def init([pubsub, max_queue_length, _name]) do
# publish_mutation callback implementation needs to be updated to use
# this topic
:ok = pubsub.subscribe("__absinthe__:subscription:mutation_published")

{:producer,
%{
pubsub: pubsub,
node: pubsub.node_name(),
queue: :queue.new(),
pending_demand: 0,
max_queue_length: max_queue_length
}}
end

@doc """
Callback for the consumer to ask for more
subscriptions to process.
"""
def handle_demand(demand, state) do
do_handle_demand(demand, state)
end

def handle_info(%{node: payload_node}, %{node: current_node} = state)
when payload_node == current_node do
{:noreply, [], state}
end

def handle_info(
%{mutation_result: mutation_result, subscribed_fields: subscribed_fields},
state
) do
queue = :queue.in({state.pubsub, mutation_result, subscribed_fields}, state.queue)
queue = drop_oldest_events(queue, state.max_queue_length)
state = Map.put(state, :queue, queue)

do_handle_demand(0, state)
end

def handle_info(_, state) do
{:noreply, [], state}
end

defp do_handle_demand(demand, state) do
demand = demand + state.pending_demand
queue_length = :queue.len(state.queue)

if queue_length < demand do
# if we don't have enough items to satisfy demand then
# send what we have, and save pending demand
events_to_send = :queue.to_list(state.queue)

pending_demand = demand - length(events_to_send)

state =
state
|> Map.put(:queue, :queue.new())
|> Map.put(:pending_demand, pending_demand)

{:noreply, events_to_send, state}
else
# if we do have enough to satisfy demand, then send what's asked for
{events_to_send_queue, remaining_events_queue} = :queue.split(demand, state.queue)
events_to_send = :queue.to_list(events_to_send_queue)

pending_demand = demand - length(events_to_send)
pending_demand = if pending_demand < 0, do: 0, else: pending_demand

state =
state
|> Map.put(:queue, remaining_events_queue)
|> Map.put(:pending_demand, pending_demand)

{:noreply, events_to_send, state}
end
end

# drop oldest events until we are under the max_queue_size
defp drop_oldest_events(queue, max_queue_length) do
queue_length = :queue.len(queue)

if queue_length > max_queue_length do
Logger.warning(
"[Absinthe.Subscription.MutationPublishListener] Queue length (#{inspect(queue_length)}) exceeds max_queue_length (#{inspect(max_queue_length)}). Dropping oldest events until max_queue_length is reached"
)

events_to_drop = :queue.len(queue) - max_queue_length
{_, new_queue} = :queue.split(events_to_drop, queue)
new_queue
else
queue
end
end
end
42 changes: 42 additions & 0 deletions lib/absinthe/subscription/mutation_publish_processor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Absinthe.Subscription.MutationPublishProcessor do
@moduledoc """
Processes the publish_mutation request on the
local node.
"""

def start_link({pubsub, mutation_result, subscribed_fields}) do
Task.start_link(fn ->
id = :erlang.unique_integer()
system_time = System.system_time()
start_time_mono = System.monotonic_time()

:telemetry.execute(
[:absinthe, :subscription, :publish_mutation, :start],
%{system_time: system_time},
%{
id: id,
telemetry_span_context: id,
mutation_result: mutation_result,
subscribed_fields: subscribed_fields
}
)

try do
Absinthe.Subscription.Local.publish_mutation(pubsub, mutation_result, subscribed_fields)
after
end_time_mono = System.monotonic_time()

:telemetry.execute(
[:absinthe, :subscription, :publish_mutation, :stop],
%{duration: end_time_mono - start_time_mono, end_time_mono: end_time_mono},
%{
id: id,
telemetry_span_context: id,
mutation_result: mutation_result,
subscribed_fields: subscribed_fields
}
)
end
end)
end
end
30 changes: 30 additions & 0 deletions lib/absinthe/subscription/mutation_publish_processor_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
defmodule Absinthe.Subscription.MutationPublishProcessorSupervisor do
@moduledoc """
Supervisor for consuming publish_mutation requests
"""

use ConsumerSupervisor

alias Absinthe.Subscription.MutationPublishProcessor

def start_link(args) do
ConsumerSupervisor.start_link(__MODULE__, args)
end

def init([max_demand, producer_name]) do
children = [
%{
id: MutationPublishProcessor,
start: {MutationPublishProcessor, :start_link, []},
restart: :transient
}
]

opts = [
strategy: :one_for_one,
subscribe_to: [{producer_name, max_demand: max_demand}]
]

ConsumerSupervisor.init(children, opts)
end
end
23 changes: 23 additions & 0 deletions lib/absinthe/subscription/mutation_publish_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Absinthe.Subscription.MutationPublishSupervisor do
@moduledoc false

use Supervisor

def start_link([pubsub, max_demand, max_queue_length]) do
Supervisor.start_link(__MODULE__, {pubsub, max_demand, max_queue_length})
end

def init({pubsub, max_demand, max_queue_length}) do
unique_producer_name =
:"#{Absinthe.Subscription.MutationPublishListener}.#{:erlang.unique_integer([:monotonic])}"

children = [
{Absinthe.Subscription.MutationPublishListener,
[pubsub, max_queue_length, unique_producer_name]},
{Absinthe.Subscription.MutationPublishProcessorSupervisor,
[max_demand, unique_producer_name]}
]

Supervisor.init(children, strategy: :one_for_one)
end
end
12 changes: 6 additions & 6 deletions lib/absinthe/subscription/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,26 @@ defmodule Absinthe.Subscription.Supervisor do
module
end

pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2)
max_demand = Keyword.get(opts, :max_demand, System.schedulers_online() * 2)
max_queue_length = Keyword.get(opts, :max_queue_length, 10_000)
compress_registry? = Keyword.get(opts, :compress_registry?, true)

Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?})
Supervisor.start_link(__MODULE__, {pubsub, max_demand, max_queue_length, compress_registry?})
end

def init({pubsub, pool_size, compress_registry?}) do
def init({pubsub, max_demand, max_queue_length, compress_registry?}) do
registry_name = Absinthe.Subscription.registry_name(pubsub)
meta = [pool_size: pool_size]

children = [
{Registry,
[
keys: :duplicate,
name: registry_name,
partitions: System.schedulers_online(),
meta: meta,
meta: [],
compressed: compress_registry?
]},
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]}
{Absinthe.Subscription.MutationPublishSupervisor, [pubsub, max_demand, max_queue_length]}
]

Supervisor.init(children, strategy: :one_for_one)
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ defmodule Absinthe.Mixfile do

defp deps do
[
{:gen_stage, "~> 1.2"},
{:nimble_parsec, "~> 1.2.2 or ~> 1.3"},
{:telemetry, "~> 1.0 or ~> 0.4"},
{:dataloader, "~> 1.0.0 or ~> 2.0", optional: true},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
Expand Down
Loading