Skip to content

Commit

Permalink
Let user start their own storage if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanjos committed Jun 28, 2024
1 parent 12979d7 commit 5aa047a
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 93 deletions.
13 changes: 4 additions & 9 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,13 @@ defmodule Absinthe.Subscription do
Module.concat([pubsub, :Registry])
end

@doc false
def document_storage_name(pubsub) do
Module.concat([pubsub, :Storage])
end

def document_storage(pubsub) do
{:ok, document_storage} =
def storage_module(pubsub) do
{:ok, storage} =
pubsub
|> registry_name
|> Registry.meta(:document_storage)
|> Registry.meta(:storage)

document_storage
storage
end

@doc false
Expand Down
30 changes: 16 additions & 14 deletions lib/absinthe/subscription/default_document_storage.ex
Original file line number Diff line number Diff line change
@@ -1,47 +1,49 @@
defmodule Absinthe.Subscription.DefaultDocumentStorage do
@behaviour Absinthe.Subscription.DocumentStorage

@moduledoc """
Default document storage for Absinthe. Stores subscription
documents and field keys in a Registry.
"""

@impl Absinthe.Subscription.DocumentStorage
def child_spec(opts) do
Registry.child_spec(opts)
end
alias Absinthe.Subscription

@impl Absinthe.Subscription.DocumentStorage
def put(storage_process_name, doc_id, doc_value, field_keys) do
def put(pubsub, doc_id, doc_value, field_keys) do
registry = Subscription.registry_name(pubsub)

pdict_add_fields(doc_id, field_keys)

for field_key <- field_keys do
{:ok, _} = Registry.register(storage_process_name, field_key, doc_id)
{:ok, _} = Registry.register(registry, field_key, doc_id)
end

{:ok, _} = Registry.register(storage_process_name, doc_id, doc_value)
{:ok, _} = Registry.register(registry, doc_id, doc_value)
end

@impl Absinthe.Subscription.DocumentStorage
def delete(storage_process_name, doc_id) do
def delete(pubsub, doc_id) do
registry = Subscription.registry_name(pubsub)

for field_key <- pdict_fields(doc_id) do
Registry.unregister(storage_process_name, field_key)
Registry.unregister(registry, field_key)
end

pdict_delete_fields(doc_id)

Registry.unregister(storage_process_name, doc_id)
Registry.unregister(registry, doc_id)

:ok
end

@impl Absinthe.Subscription.DocumentStorage
def get_docs_by_field_key(storage_process_name, field_key) do
storage_process_name
def get_docs_by_field_key(pubsub, field_key) do
registry = Subscription.registry_name(pubsub)

registry
|> Registry.lookup(field_key)
|> MapSet.new(fn {_pid, doc_id} -> doc_id end)
|> Enum.reduce(%{}, fn doc_id, acc ->
case Registry.lookup(storage_process_name, doc_id) do
case Registry.lookup(registry, doc_id) do
[] ->
acc

Expand Down
53 changes: 11 additions & 42 deletions lib/absinthe/subscription/document_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,22 @@ defmodule Absinthe.Subscription.DocumentStorage do
the storage for subscription documents. This behaviour can be implemented to
allow for a custom storage solution if needed.
The `child_spec` is used so that Absinthe can start your process when starting `Absinthe.Subscription`.
To tell `Absinthe.Subscription` to use your custom storage, make sure to pass in `document_storage` and `storage_opts`
when adding `Absinthe.Subscription` to your application supervisor.
```elixir
{Absinthe.Subscription, pubsub: MyApp.Pubsub, document_storage: MyApp.DocumentStorage, storage_opts: [key1: value1, key2: value2]}
```
Absinthe.Subscription will update `storage_opts` to include a `name` key. This will be the name `Absinthe.Subscription` uses to
reference the process.
When starting `Absinthe.Subscription`, include `storage`. Defaults to `Absinthe.Subscription.DefaultDocumentStorage`
```elixir
@impl Absinthe.Subscription.DocumentStorage
def child_spec(opts) do
# opts is the `storage_opts` with the `name` key added
{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]}
}
end
{Absinthe.Subscription, pubsub: MyApp.Pubsub, storage: MyApp.DocumentStorage}
```
"""

alias Absinthe.Subscription
alias Absinthe.Subscription.PipelineSerializer

@doc """
Child spec to determine how to start the
Document storage process. This will be supervised. Absinthe will give
the process a name and that name will be passed in the other callbacks
in order to reference it there.
"""
@callback child_spec(opts :: Keyword.t()) :: Supervisor.child_spec()

@doc """
Adds `doc` to storage with `doc_id` as the key. Associates the given
`field_keys` with `doc_id`.
"""
@callback put(
storage_process_name :: atom,
pubsub :: atom,
doc_id :: term,
doc :: %{
initial_phases: Absinthe.Subscription.PipelineSerializer.packed_pipeline(),
Expand All @@ -61,20 +36,20 @@ defmodule Absinthe.Subscription.DocumentStorage do
@doc """
Removes the document. Along with any field_keys associated with it
"""
@callback delete(storage_process_name :: atom, doc_id :: term) :: :ok
@callback delete(pubsub :: atom, doc_id :: term) :: :ok

@doc """
Get all docs associated with `field_key`
"""
@callback get_docs_by_field_key(
storage_process_name :: atom,
pubsub :: atom,
field_key :: {field :: term, key :: term}
) ::
map()

@doc false
def put(pubsub, doc_id, doc, field_keys) do
{storage_module, storage_process_name} = storage_info(pubsub)
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :put],
Expand All @@ -92,7 +67,7 @@ defmodule Absinthe.Subscription.DocumentStorage do
source: doc.source
}

result = storage_module.put(storage_process_name, doc_id, doc_value, field_keys)
result = storage_module.put(pubsub, doc_id, doc_value, field_keys)

{result,
%{
Expand All @@ -107,7 +82,7 @@ defmodule Absinthe.Subscription.DocumentStorage do

@doc false
def delete(pubsub, doc_id) do
{storage_module, storage_process_name} = storage_info(pubsub)
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :delete],
Expand All @@ -116,7 +91,7 @@ defmodule Absinthe.Subscription.DocumentStorage do
storage_module: storage_module
},
fn ->
result = storage_module.delete(storage_process_name, doc_id)
result = storage_module.delete(pubsub, doc_id)

{result,
%{
Expand All @@ -129,7 +104,7 @@ defmodule Absinthe.Subscription.DocumentStorage do

@doc false
def get_docs_by_field_key(pubsub, field_key) do
{storage_module, storage_process_name} = storage_info(pubsub)
storage_module = Subscription.storage_module(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :get_docs_by_field_key],
Expand All @@ -139,7 +114,7 @@ defmodule Absinthe.Subscription.DocumentStorage do
},
fn ->
result =
storage_process_name
pubsub
|> storage_module.get_docs_by_field_key(field_key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
Expand All @@ -155,10 +130,4 @@ defmodule Absinthe.Subscription.DocumentStorage do
end
)
end

defp storage_info(pubsub) do
storage_module = Subscription.document_storage(pubsub)
storage_process_name = Subscription.document_storage_name(pubsub)
{storage_module, storage_process_name}
end
end
36 changes: 8 additions & 28 deletions lib/absinthe/subscription/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,24 @@ defmodule Absinthe.Subscription.Supervisor do

pool_size = Keyword.get(opts, :pool_size, System.schedulers_online() * 2)
compress_registry? = Keyword.get(opts, :compress_registry?, true)
storage = Keyword.get(opts, :storage, Absinthe.Subscription.DefaultDocumentStorage)

document_storage =
Keyword.get(opts, :document_storage, Absinthe.Subscription.DefaultDocumentStorage)

storage_opts =
case document_storage do
Absinthe.Subscription.DefaultDocumentStorage ->
[
keys: :duplicate,
partitions: System.schedulers_online(),
compressed: compress_registry?
]

_ ->
Keyword.get(opts, :storage_opts, Keyword.new())
end

Supervisor.start_link(
__MODULE__,
{pubsub, pool_size, document_storage, storage_opts}
)
Supervisor.start_link(__MODULE__, {pubsub, pool_size, compress_registry?, storage})
end

def init({pubsub, pool_size, document_storage, storage_opts}) do
def init({pubsub, pool_size, compress_registry?, storage}) do
registry_name = Absinthe.Subscription.registry_name(pubsub)
meta = [pool_size: pool_size, document_storage: document_storage]

storage_opts =
Keyword.put(storage_opts, :name, Absinthe.Subscription.document_storage_name(pubsub))
meta = [pool_size: pool_size, storage: storage]

children = [
{Registry,
[
keys: :unique,
keys: :duplicate,
name: registry_name,
meta: meta
partitions: System.schedulers_online(),
meta: meta,
compressed: compress_registry?
]},
document_storage.child_spec(storage_opts),
{Absinthe.Subscription.ProxySupervisor, [pubsub, registry_name, pool_size]}
]

Expand Down

0 comments on commit 5aa047a

Please sign in to comment.