Skip to content

Commit

Permalink
Add wrapper functions in DocumentStorage to add telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanjos committed Jun 24, 2024
1 parent 10a17ae commit 6f8dbdd
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 24 deletions.
28 changes: 4 additions & 24 deletions lib/absinthe/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Absinthe.Subscription do

alias __MODULE__

alias Absinthe.Subscription.PipelineSerializer
alias Absinthe.Subscription.DocumentStorage

@doc """
Add Absinthe.Subscription to your process tree.
Expand Down Expand Up @@ -141,37 +141,17 @@ defmodule Absinthe.Subscription do

@doc false
def subscribe(pubsub, field_keys, doc_id, doc) do
field_keys = List.wrap(field_keys)

doc_value = %{
initial_phases: PipelineSerializer.pack(doc.initial_phases),
source: doc.source
}

storage_module = document_storage(pubsub)
storage_process_name = document_storage_name(pubsub)
storage_module.put(storage_process_name, doc_id, doc_value, field_keys)
DocumentStorage.put(pubsub, doc_id, doc, field_keys)
end

@doc false
def unsubscribe(pubsub, doc_id) do
storage_module = document_storage(pubsub)
storage_process_name = document_storage_name(pubsub)
storage_module.delete(storage_process_name, doc_id)
DocumentStorage.delete(pubsub, doc_id)
end

@doc false
def get(pubsub, key) do
storage_module = document_storage(pubsub)
storage_process_name = document_storage_name(pubsub)

storage_process_name
|> storage_module.get_docs_by_field_key(key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
{doc_id, Map.put(doc, :initial_phases, initial_phases)}
end)
|> Map.new()
DocumentStorage.get_docs_by_field_key(pubsub, key)
end

@doc false
Expand Down
93 changes: 93 additions & 0 deletions lib/absinthe/subscription/document_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ defmodule Absinthe.Subscription.DocumentStorage do
documents.
"""

alias Absinthe.Subscription
alias Absinthe.Subscription.PipelineSerializer

@doc """
Child spec to determine how to start the
Document storage process. This will be supervised. Absinthe will give
Expand Down Expand Up @@ -41,4 +44,94 @@ defmodule Absinthe.Subscription.DocumentStorage do
field_key :: {field :: term, key :: term}
) ::
map()

@doc false
def put(pubsub, doc_id, doc, field_keys) do
{storage_module, storage_process_name} = storage_info(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :put],
%{
doc_id: doc_id,
doc: doc,
field_keys: field_keys,
storage_module: storage_module
},
fn ->
field_keys = List.wrap(field_keys)

doc_value = %{
initial_phases: PipelineSerializer.pack(doc.initial_phases),
source: doc.source
}

result = storage_module.put(storage_process_name, doc_id, doc_value, field_keys)

{result,
%{
doc_id: doc_id,
doc: doc,
field_keys: field_keys,
storage_module: storage_module
}}
end
)
end

@doc false
def delete(pubsub, doc_id) do
{storage_module, storage_process_name} = storage_info(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :delete],
%{
doc_id: doc_id,
storage_module: storage_module
},
fn ->
result = storage_module.delete(storage_process_name, doc_id)

{result,
%{
doc_id: doc_id,
storage_module: storage_module
}}
end
)
end

@doc false
def get_docs_by_field_key(pubsub, field_key) do
{storage_module, storage_process_name} = storage_info(pubsub)

:telemetry.span(
[:absinthe, :subscription, :storage, :get_docs_by_field_key],
%{
field_key: field_key,
storage_module: storage_module
},
fn ->
result =
storage_process_name
|> storage_module.get_docs_by_field_key(field_key)
|> Enum.map(fn {doc_id, %{initial_phases: initial_phases} = doc} ->
initial_phases = PipelineSerializer.unpack(initial_phases)
{doc_id, Map.put(doc, :initial_phases, initial_phases)}
end)
|> Map.new()

{result,
%{
field_key: field_key,
storage_module: storage_module
}}
end
)
end

defp storage_info(pubsub) do
storage_module = Subscription.document_storage(pubsub)
storage_process_name = Subscription.document_storage_name(pubsub)
{storage_module, storage_process_name}
end
end

0 comments on commit 6f8dbdd

Please sign in to comment.