From 402712a859a70f971e63389928157ce85be2dd64 Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Thu, 20 Jun 2024 12:11:45 -0500 Subject: [PATCH] Fix bad value from publish listener --- lib/absinthe/subscription.ex | 16 ++++++---------- .../subscription/mutation_publish_listener.ex | 4 ++-- .../subscription/mutation_publish_supervisor.ex | 2 +- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index 6eed4613..ce69beb3 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -214,16 +214,12 @@ defmodule Absinthe.Subscription do @doc false def publish_remote(pubsub, mutation_result, subscribed_fields) do - {:ok, pool_size} = - pubsub - |> registry_name - |> Registry.meta(:pool_size) - - shard = :erlang.phash2(mutation_result, pool_size) - - proxy_topic = Subscription.Proxy.topic(shard) - - :ok = pubsub.publish_mutation(proxy_topic, mutation_result, subscribed_fields) + :ok = + pubsub.publish_mutation( + "__absinthe__:subscription:mutation_published", + mutation_result, + subscribed_fields + ) end ## Middleware callback diff --git a/lib/absinthe/subscription/mutation_publish_listener.ex b/lib/absinthe/subscription/mutation_publish_listener.ex index 32fec62c..e8cd924b 100644 --- a/lib/absinthe/subscription/mutation_publish_listener.ex +++ b/lib/absinthe/subscription/mutation_publish_listener.ex @@ -69,7 +69,7 @@ defmodule Absinthe.Subscription.MutationPublishListener do |> Map.put(:queue, :queue.new()) |> Map.put(:pending_demand, pending_demand) - {:no_reply, events_to_send, state} + {:noreply, events_to_send, state} else # if we do have enough to satisfy demand, then send what's asked for {events_to_send_queue, remaining_events_queue} = :queue.split(demand, state.queue) @@ -83,7 +83,7 @@ defmodule Absinthe.Subscription.MutationPublishListener do |> Map.put(:queue, remaining_events_queue) |> Map.put(:pending_demand, pending_demand) - {:no_reply, events_to_send, state} + {:noreply, events_to_send, state} end end diff --git a/lib/absinthe/subscription/mutation_publish_supervisor.ex b/lib/absinthe/subscription/mutation_publish_supervisor.ex index fd32c875..08fbe2b4 100644 --- a/lib/absinthe/subscription/mutation_publish_supervisor.ex +++ b/lib/absinthe/subscription/mutation_publish_supervisor.ex @@ -9,7 +9,7 @@ defmodule Absinthe.Subscription.MutationPublishSupervisor do def init({pubsub, max_demand, max_queue_length}) do unique_producer_name = - :"#{Absinthe.Subscription.LocalProducer}.#{:erlang.unique_integer([:monotonic])}" + :"#{Absinthe.Subscription.MutationPublishListener}.#{:erlang.unique_integer([:monotonic])}" children = [ {Absinthe.Subscription.MutationPublishListener,