Skip to content

Commit

Permalink
feat: improve ockam hub metrics reporting
Browse files Browse the repository at this point in the history
Use registry to track worker module names
Add custom telemetry poller to collect active workers count per module

Report routed messages
Report current workers and tcp connections
Cleanup list of metrics and tags reported to influxdb
  • Loading branch information
hairyhum committed Feb 18, 2022
1 parent faf43e0 commit 2a0d5d4
Show file tree
Hide file tree
Showing 31 changed files with 371 additions and 338 deletions.
2 changes: 1 addition & 1 deletion examples/elixir/get_started/01-worker.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs", "echoer.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Create a Echoer type worker at address "echoer".
{:ok, _echoer} = Echoer.create(address: "echoer")
Expand Down
2 changes: 1 addition & 1 deletion examples/elixir/get_started/02-routing.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs", "echoer.exs", "hop.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Create a Echoer type worker at address "echoer".
{:ok, _echoer} = Echoer.create(address: "echoer")
Expand Down
2 changes: 1 addition & 1 deletion examples/elixir/get_started/03-routing-many-hops.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs", "echoer.exs", "hop.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Create a Echoer type worker at address "echoer".
{:ok, _echoer} = Echoer.create(address: "echoer")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Start the TCP Transport Add-on for Ockam Routing.
Ockam.Transport.TCP.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Start the TCP Transport Add-on for Ockam Routing.
Ockam.Transport.TCP.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs", "waiter.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Start the TCP Transport Add-on for Ockam Routing.
Ockam.Transport.TCP.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Start the TCP Transport Add-on for Ockam Routing.
Ockam.Transport.TCP.start()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs", "echoer.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Create a Echoer type worker at address "echoer".
{:ok, _echoer} = Echoer.create(address: "echoer")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
["setup.exs", "waiter.exs"] |> Enum.map(&Code.require_file/1)

# Register this process as worker address "app".
Ockam.Node.register_address("app", self())
Ockam.Node.register_address("app")

# Start the TCP Transport Add-on for Ockam Routing.
Ockam.Transport.TCP.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ defmodule Ockam.AsymmetricWorker do
def register_inner_address(options, state) do
case Keyword.get(options, :inner_address) do
nil ->
Ockam.Node.register_random_address()
Ockam.Node.register_random_address(address_prefix(options), __MODULE__)

inner_address ->
case Ockam.Node.register_address(inner_address) do
:yes -> {:ok, inner_address}
:no -> {:error, :inner_address_already_taken}
case Ockam.Node.register_address(inner_address, __MODULE__) do
:ok -> {:ok, inner_address}
{:error, _reason} -> {:error, :inner_address_already_taken}
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Ockam.Examples.Forwarding do
TCP.start()
forwarding_route = [@hub_address, "forwarding_service"]

Ockam.Node.register_address("example_responder", self())
Ockam.Node.register_address("example_responder")

with {:ok, forwarder_address} <-
ServiceApi.register_self(forwarding_route, "example_responder") do
Expand All @@ -63,7 +63,7 @@ defmodule Ockam.Examples.Forwarding do
TCP.start()
forwarder_route = [@hub_address, forwarder_address]

Ockam.Node.register_address("example_initiator", self())
Ockam.Node.register_address("example_initiator")

Ockam.Router.route(%{
onward_route: forwarder_route,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Ockam.Examples.Routing.Local do

## Register this process to receive messages
my_address = "example_run"
Ockam.Node.register_address(my_address, self())
Ockam.Node.register_address(my_address)

message = %{
## Route message through hop to echoer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Ockam.Examples.Routing.TCP do

## Register this process to receive messages
my_address = "example_run"
Ockam.Node.register_address(my_address, self())
Ockam.Node.register_address(my_address)

Ockam.Router.route(%{
onward_route: [server_host_address, "echoer"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Ockam.Examples.SecureChannel.Local do

## Register this process to receive messages
my_address = "example_run"
Ockam.Node.register_address(my_address, self())
Ockam.Node.register_address(my_address)

send_and_wait(channel, "Hello secure channel!", my_address)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Ockam.Examples.SecureChannel.TCP do

## Register this process to receive messages
my_address = "example_run"
Ockam.Node.register_address(my_address, self())
Ockam.Node.register_address(my_address)

send_and_wait(channel, "Hello secure channel over TCP!", my_address)

Expand Down
8 changes: 6 additions & 2 deletions implementations/elixir/ockam/ockam/lib/ockam/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,14 @@ defmodule Ockam.Message do
end

@doc "Get onward_route from the message"
def onward_route(%Ockam.Message{onward_route: onward_route}), do: onward_route
def onward_route(%Ockam.Message{onward_route: onward_route}) when is_list(onward_route),
do: onward_route

@doc "Get return_route from the message"
def return_route(%Ockam.Message{return_route: return_route}), do: return_route
def return_route(%Ockam.Message{return_route: return_route}) when is_list(return_route),
do: return_route

def return_route(%Ockam.Message{return_route: nil}), do: []

@doc "Get payload from the message"
def payload(%Ockam.Message{payload: payload}), do: payload
Expand Down
106 changes: 75 additions & 31 deletions implementations/elixir/ockam/ockam/lib/ockam/node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,46 +33,101 @@ defmodule Ockam.Node do
end
end

def register_address(address) do
Registry.register_name(address, self())
@spec register_address(any(), module()) :: :ok | {:error, any()}
@doc """
Registers the address of the current process with optional module name
"""
def register_address(address, module \\ nil) do
Registry.register(address, module)
end

@spec set_address_module(any(), module()) :: :ok | :error
@doc """
Registers the address of a `pid`.
Sets module name for already registered process
"""
defdelegate register_address(address, pid), to: Registry, as: :register_name
def set_address_module(address, module) do
Registry.set_module(address, module)
end

@spec unregister_address(any()) :: :ok
@doc """
Unregisters an address.
"""
defdelegate unregister_address(address), to: Registry, as: :unregister_name

@spec list_addresses() :: [address :: any()]
@doc """
Lists all registered addresses
"""
defdelegate list_addresses(), to: Registry, as: :list_names

@spec list_workers() :: [{address :: any(), pid(), module()}]
@doc """
Lists all workers with their primary address, worker pid and module
"""
## TODO: currently taking just one random address per pid, make sure it's primary
def list_workers() do
list_addresses()
|> Enum.flat_map(fn address ->
case Registry.lookup(address) do
{:ok, pid, module} -> [{address, pid, module}]
:error -> []
end
end)
|> Enum.uniq_by(fn {_address, pid, _module} -> pid end)
end

@doc """
Send a message to the process registered with an address.
"""
def send(address, %Ockam.Message{} = message) do
case Registry.whereis_name(address) do
# dead letters
:undefined -> :ok
_pid -> Registry.send(address, message)
:undefined ->
report_message(:unsent, address, message)
:ok

_pid ->
Registry.send(address, message)
report_message(:sent, address, message)
:ok
end
end

def register_random_address(prefix \\ "", length_in_bytes \\ @default_address_length_in_bytes) do
@spec report_message(:sent | :unsent, any(), Ockam.Message.t()) :: :ok
def report_message(sent_status, address, message) do
from = Enum.at(Message.return_route(message), 0)

metadata = %{from: from, to: address, message: message}

Telemetry.emit_event([__MODULE__, :message, sent_status],
measurements: %{count: 1},
metadata: metadata
)
end

@spec register_random_address(prefix :: String.t(), module(), length_in_bytes :: integer()) ::
{:ok, address :: any()} | {:error, any()}
@doc """
Registers random address of certain length using set prefix and module name
"""
## TODO: make address actually fit into length in bytes
def register_random_address(
prefix \\ "",
module \\ nil,
length_in_bytes \\ @default_address_length_in_bytes
) do
address = get_random_unregistered_address(prefix, length_in_bytes)

case register_address(address) do
:yes -> {:ok, address}
case register_address(address, module) do
:ok -> {:ok, address}
## TODO: recursion limit
:no -> register_random_address(prefix, length_in_bytes)
{:error, _reason} -> register_random_address(prefix, module, length_in_bytes)
end
end

@spec get_random_unregistered_address(prefix :: String.t(), length_in_bytes :: integer()) ::
binary()
@doc """
Returns a random address that is currently not registed on the node.
"""
Expand Down Expand Up @@ -149,27 +204,16 @@ defmodule Ockam.Node do
end

def handle_local_message(%Ockam.Message{} = message) do
metadata = %{message: message}

start_time =
Telemetry.emit_start_event([__MODULE__, :handle_local_message], metadata: metadata)

return_value =
case Message.onward_route(message) do
[] ->
# Logger.warn("Routing message with no onward_route: #{inspect(message)}")
:ok

[first | _rest] ->
0 = Address.type(first)
local_address = Address.value(first)
__MODULE__.send(local_address, message)
end

metadata = Map.put(metadata, :return_value, return_value)

Telemetry.emit_stop_event([__MODULE__, :handle_local_message], start_time, metadata: metadata)
case Message.onward_route(message) do
[] ->
report_message(:unsent, nil, message)
# Logger.warn("Routing message with no onward_route: #{inspect(message)}")
:ok

return_value
[first | _rest] ->
0 = Address.type(first)
local_address = Address.value(first)
__MODULE__.send(local_address, message)
end
end
end
37 changes: 37 additions & 0 deletions implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ defmodule Ockam.Node.Registry do
# See the "Name registration" section of the `GenServer` module.
def unregister_name(address), do: Registry.unregister_name({__MODULE__, address})

@doc false
def update_value(address, value), do: Registry.update_value(__MODULE__, address, value)

@spec addresses(pid) :: [any]

def addresses(pid), do: Registry.keys(__MODULE__, pid)
Expand All @@ -59,4 +62,38 @@ defmodule Ockam.Node.Registry do
List all registered worker names
"""
def list_names(), do: Registry.select(__MODULE__, [{{:"$1", :_, :_}, [], [:"$1"]}])

@spec register(any(), any()) :: :ok | {:error, reason :: any()}
@doc false
# This function is used in custom process registration
#
# Module should be the worker implementation module
def register(address, module) do
case Registry.register(__MODULE__, address, module) do
{:ok, _owner} -> :ok
{:error, reason} -> {:error, reason}
end
end

@spec set_module(any(), module()) :: :ok | :error
@doc false
# Set worker module for the current process
#
# This function is called from the worker behaviour
# Module is not set when registering with register_name from `:via` option
# so this function needs to be called to set it after the process is created
def set_module(address, module) do
case Registry.update_value(__MODULE__, address, fn _old -> module end) do
:error -> :error
{_new, _old} -> :ok
end
end

@spec lookup(address :: any()) :: {:ok, pid, module} | :error
def lookup(address) do
case Registry.lookup(__MODULE__, address) do
[{pid, module}] -> {:ok, pid, module}
[] -> :error
end
end
end
1 change: 1 addition & 0 deletions implementations/elixir/ockam/ockam/lib/ockam/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ defmodule Ockam.Router do
defp invoke_handler(handler, message) do
case apply_handler(handler, message) do
{:error, error} -> {:error, {:handler_error, error, message, handler}}
## TODO: require and match :ok result
_anything_else -> :ok
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ defmodule Ockam.SecureChannel.Channel do
address_prefix = Keyword.get(options, :address_prefix, "")
ciphertext_address = Node.get_random_unregistered_address(address_prefix)

with :yes <- Node.register_address(ciphertext_address, self()) do
with :ok <- Node.register_address(ciphertext_address, __MODULE__) do
{:ok, Map.put(data, :ciphertext_address, ciphertext_address)}
end
end
Expand Down
Loading

0 comments on commit 2a0d5d4

Please sign in to comment.