From 5aa047a27a37ad049035a7222aa880d6eebf5baa Mon Sep 17 00:00:00 2001 From: Bryan Joseph Date: Fri, 28 Jun 2024 11:14:22 -0500 Subject: [PATCH] Let user start their own storage if needed --- lib/absinthe/subscription.ex | 13 ++--- .../subscription/default_document_storage.ex | 30 ++++++----- lib/absinthe/subscription/document_storage.ex | 53 ++++--------------- lib/absinthe/subscription/supervisor.ex | 36 +++---------- 4 files changed, 39 insertions(+), 93 deletions(-) diff --git a/lib/absinthe/subscription.ex b/lib/absinthe/subscription.ex index c1ec45e6..c360a5d6 100644 --- a/lib/absinthe/subscription.ex +++ b/lib/absinthe/subscription.ex @@ -159,18 +159,13 @@ defmodule Absinthe.Subscription do Module.concat([pubsub, :Registry]) end - @doc false - def document_storage_name(pubsub) do - Module.concat([pubsub, :Storage]) - end - - def document_storage(pubsub) do - {:ok, document_storage} = + def storage_module(pubsub) do + {:ok, storage} = pubsub |> registry_name - |> Registry.meta(:document_storage) + |> Registry.meta(:storage) - document_storage + storage end @doc false diff --git a/lib/absinthe/subscription/default_document_storage.ex b/lib/absinthe/subscription/default_document_storage.ex index d6ffea10..ee48af54 100644 --- a/lib/absinthe/subscription/default_document_storage.ex +++ b/lib/absinthe/subscription/default_document_storage.ex @@ -1,47 +1,49 @@ defmodule Absinthe.Subscription.DefaultDocumentStorage do @behaviour Absinthe.Subscription.DocumentStorage - @moduledoc """ Default document storage for Absinthe. Stores subscription documents and field keys in a Registry. """ - @impl Absinthe.Subscription.DocumentStorage - def child_spec(opts) do - Registry.child_spec(opts) - end + alias Absinthe.Subscription @impl Absinthe.Subscription.DocumentStorage - def put(storage_process_name, doc_id, doc_value, field_keys) do + def put(pubsub, doc_id, doc_value, field_keys) do + registry = Subscription.registry_name(pubsub) + pdict_add_fields(doc_id, field_keys) for field_key <- field_keys do - {:ok, _} = Registry.register(storage_process_name, field_key, doc_id) + {:ok, _} = Registry.register(registry, field_key, doc_id) end - {:ok, _} = Registry.register(storage_process_name, doc_id, doc_value) + {:ok, _} = Registry.register(registry, doc_id, doc_value) end @impl Absinthe.Subscription.DocumentStorage - def delete(storage_process_name, doc_id) do + def delete(pubsub, doc_id) do + registry = Subscription.registry_name(pubsub) + for field_key <- pdict_fields(doc_id) do - Registry.unregister(storage_process_name, field_key) + Registry.unregister(registry, field_key) end pdict_delete_fields(doc_id) - Registry.unregister(storage_process_name, doc_id) + Registry.unregister(registry, doc_id) :ok end @impl Absinthe.Subscription.DocumentStorage - def get_docs_by_field_key(storage_process_name, field_key) do - storage_process_name + def get_docs_by_field_key(pubsub, field_key) do + registry = Subscription.registry_name(pubsub) + + registry |> Registry.lookup(field_key) |> MapSet.new(fn {_pid, doc_id} -> doc_id end) |> Enum.reduce(%{}, fn doc_id, acc -> - case Registry.lookup(storage_process_name, doc_id) do + case Registry.lookup(registry, doc_id) do [] -> acc diff --git a/lib/absinthe/subscription/document_storage.ex b/lib/absinthe/subscription/document_storage.ex index 17821a3b..7f20f7b1 100644 --- a/lib/absinthe/subscription/document_storage.ex +++ b/lib/absinthe/subscription/document_storage.ex @@ -8,47 +8,22 @@ defmodule Absinthe.Subscription.DocumentStorage do the storage for subscription documents. This behaviour can be implemented to allow for a custom storage solution if needed. - The `child_spec` is used so that Absinthe can start your process when starting `Absinthe.Subscription`. - - To tell `Absinthe.Subscription` to use your custom storage, make sure to pass in `document_storage` and `storage_opts` - when adding `Absinthe.Subscription` to your application supervisor. - - ```elixir - {Absinthe.Subscription, pubsub: MyApp.Pubsub, document_storage: MyApp.DocumentStorage, storage_opts: [key1: value1, key2: value2]} - ``` - - Absinthe.Subscription will update `storage_opts` to include a `name` key. This will be the name `Absinthe.Subscription` uses to - reference the process. + When starting `Absinthe.Subscription`, include `storage`. Defaults to `Absinthe.Subscription.DefaultDocumentStorage` ```elixir - @impl Absinthe.Subscription.DocumentStorage - def child_spec(opts) do - # opts is the `storage_opts` with the `name` key added - { - id: __MODULE__, - start: {__MODULE__, :start_link, [opts]} - } - end + {Absinthe.Subscription, pubsub: MyApp.Pubsub, storage: MyApp.DocumentStorage} ``` """ alias Absinthe.Subscription alias Absinthe.Subscription.PipelineSerializer - @doc """ - Child spec to determine how to start the - Document storage process. This will be supervised. Absinthe will give - the process a name and that name will be passed in the other callbacks - in order to reference it there. - """ - @callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec() - @doc """ Adds `doc` to storage with `doc_id` as the key. Associates the given `field_keys` with `doc_id`. """ @callback put( - storage_process_name :: atom, + pubsub :: atom, doc_id :: term, doc :: %{ initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(), @@ -61,20 +36,20 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc """ Removes the document. Along with any field_keys associated with it """ - @callback delete(storage_process_name :: atom, doc_id :: term) :: :ok + @callback delete(pubsub :: atom, doc_id :: term) :: :ok @doc """ Get all docs associated with `field_key` """ @callback get_docs_by_field_key( - storage_process_name :: atom, + pubsub :: atom, field_key :: {field :: term, key :: term} ) :: map() @doc false def put(pubsub, doc_id, doc, field_keys) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :put], @@ -92,7 +67,7 @@ defmodule Absinthe.Subscription.DocumentStorage do source: doc.source } - result = storage_module.put(storage_process_name, doc_id, doc_value, field_keys) + result = storage_module.put(pubsub, doc_id, doc_value, field_keys) {result, %{ @@ -107,7 +82,7 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc false def delete(pubsub, doc_id) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :delete], @@ -116,7 +91,7 @@ defmodule Absinthe.Subscription.DocumentStorage do storage_module: storage_module }, fn -> - result = storage_module.delete(storage_process_name, doc_id) + result = storage_module.delete(pubsub, doc_id) {result, %{ @@ -129,7 +104,7 @@ defmodule Absinthe.Subscription.DocumentStorage do @doc false def get_docs_by_field_key(pubsub, field_key) do - {storage_module, storage_process_name} = storage_info(pubsub) + storage_module = Subscription.storage_module(pubsub) :telemetry.span( [:absinthe, :subscription, :storage, :get_docs_by_field_key], @@ -139,7 +114,7 @@ defmodule Absinthe.Subscription.DocumentStorage do }, fn -> result = - storage_process_name + pubsub |> storage_module.get_docs_by_field_key(field_key) |> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} -> initial_phases = PipelineSerializer.unpack(initial_phases) @@ -155,10 +130,4 @@ defmodule Absinthe.Subscription.DocumentStorage do end ) end - - defp storage_info(pubsub) do - storage_module = Subscription.document_storage(pubsub) - storage_process_name = Subscription.document_storage_name(pubsub) - {storage_module, storage_process_name} - end end diff --git a/lib/absinthe/subscription/supervisor.ex b/lib/absinthe/subscription/supervisor.ex index 1d85cd7a..d56ae97a 100644 --- a/lib/absinthe/subscription/supervisor.ex +++ b/lib/absinthe/subscription/supervisor.ex @@ -23,44 +23,24 @@ defmodule Absinthe.Subscription.Supervisor do pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2) compress_registry? = Keyword.get(opts, :compress_registry?, true) + storage = Keyword.get(opts, :storage, Absinthe.Subscription.DefaultDocumentStorage) - document_storage = - Keyword.get(opts, :document_storage, Absinthe.Subscription.DefaultDocumentStorage) - - storage_opts = - case document_storage do - Absinthe.Subscription.DefaultDocumentStorage -> - [ - keys: :duplicate, - partitions: System.schedulers_online(), - compressed: compress_registry? - ] - - _ -> - Keyword.get(opts, :storage_opts, Keyword.new()) - end - - Supervisor.start_link( - __MODULE__, - {pubsub, pool_size, document_storage, storage_opts} - ) + Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, storage}) end - def init({pubsub, pool_size, document_storage, storage_opts}) do + def init({pubsub, pool_size, compress_registry?, storage}) do registry_name = Absinthe.Subscription.registry_name(pubsub) - meta = [pool_size: pool_size, document_storage: document_storage] - - storage_opts = - Keyword.put(storage_opts, :name, Absinthe.Subscription.document_storage_name(pubsub)) + meta = [pool_size: pool_size, storage: storage] children = [ {Registry, [ - keys: :unique, + keys: :duplicate, name: registry_name, - meta: meta + partitions: System.schedulers_online(), + meta: meta, + compressed: compress_registry? ]}, - document_storage.child_spec(storage_opts), {Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]} ]