From 2a0d5d481040a492155fb06390c33c812e0bf9b6 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Mon, 14 Feb 2022 18:12:41 -0500 Subject: [PATCH] feat: improve ockam hub metrics reporting Use registry to track worker module names Add custom telemetry poller to collect active workers count per module Report routed messages Report current workers and tcp connections Cleanup list of metrics and tags reported to influxdb --- examples/elixir/get_started/01-worker.exs | 2 +- examples/elixir/get_started/02-routing.exs | 2 +- .../get_started/03-routing-many-hops.exs | 2 +- .../04-routing-over-transport-initiator.exs | 2 +- ...ting-over-transport-two-hops-initiator.exs | 2 +- ...nnel-over-two-transport-hops-initiator.exs | 2 +- ...-routing-via-remote-forwarder-initiator.ex | 2 +- ...routing-via-remote-forwarder-responder.exs | 2 +- ...channel-over-remote-forwarder-initiator.ex | 2 +- .../ockam/lib/ockam/asymmetric_worker.ex | 8 +- .../ockam/examples/forwarding/forwarding.ex | 4 +- .../ockam/lib/ockam/examples/routing/local.ex | 2 +- .../ockam/lib/ockam/examples/routing/tcp.ex | 2 +- .../ockam/examples/secure_channel/local.ex | 2 +- .../lib/ockam/examples/secure_channel/tcp.ex | 2 +- .../elixir/ockam/ockam/lib/ockam/message.ex | 8 +- .../elixir/ockam/ockam/lib/ockam/node.ex | 106 +++++++++++----- .../ockam/ockam/lib/ockam/node/registry.ex | 37 ++++++ .../elixir/ockam/ockam/lib/ockam/router.ex | 1 + .../ockam/lib/ockam/secure_channel/channel.ex | 2 +- .../elixir/ockam/ockam/lib/ockam/telemetry.ex | 11 +- .../ockam/lib/ockam/transport/tcp/handler.ex | 4 +- .../elixir/ockam/ockam/lib/ockam/worker.ex | 81 ++++++++++-- .../ockam/ockam/test/ockam/node_test.exs | 4 +- .../ockam/test/ockam/secure_channel_test.exs | 2 +- .../elixir/ockam/ockam_hub/config/runtime.exs | 42 ------- .../elixir/ockam/ockam_hub/lib/hub.ex | 116 ++--------------- .../lib/hub/metrics/telemetry_influxdb.ex | 87 +++++++++++++ .../ockam_hub/lib/hub/telemetry_forwarder.ex | 119 ------------------ .../ockam_hub/lib/hub/telemetry_poller.ex | 49 ++++++++ .../test/hub/service/tracing_test.exs | 2 +- 31 files changed, 371 insertions(+), 338 deletions(-) create mode 100644 implementations/elixir/ockam/ockam_hub/lib/hub/metrics/telemetry_influxdb.ex delete mode 100644 implementations/elixir/ockam/ockam_hub/lib/hub/telemetry_forwarder.ex create mode 100644 implementations/elixir/ockam/ockam_hub/lib/hub/telemetry_poller.ex diff --git a/examples/elixir/get_started/01-worker.exs b/examples/elixir/get_started/01-worker.exs index 6409c019dfc..cc406417882 100644 --- a/examples/elixir/get_started/01-worker.exs +++ b/examples/elixir/get_started/01-worker.exs @@ -1,7 +1,7 @@ ["setup.exs", "echoer.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Create a Echoer type worker at address "echoer". {:ok, _echoer} = Echoer.create(address: "echoer") diff --git a/examples/elixir/get_started/02-routing.exs b/examples/elixir/get_started/02-routing.exs index 45c38353ee2..10004c54b3c 100644 --- a/examples/elixir/get_started/02-routing.exs +++ b/examples/elixir/get_started/02-routing.exs @@ -1,7 +1,7 @@ ["setup.exs", "echoer.exs", "hop.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Create a Echoer type worker at address "echoer". {:ok, _echoer} = Echoer.create(address: "echoer") diff --git a/examples/elixir/get_started/03-routing-many-hops.exs b/examples/elixir/get_started/03-routing-many-hops.exs index e382d196945..43a80ab3966 100644 --- a/examples/elixir/get_started/03-routing-many-hops.exs +++ b/examples/elixir/get_started/03-routing-many-hops.exs @@ -1,7 +1,7 @@ ["setup.exs", "echoer.exs", "hop.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Create a Echoer type worker at address "echoer". {:ok, _echoer} = Echoer.create(address: "echoer") diff --git a/examples/elixir/get_started/04-routing-over-transport-initiator.exs b/examples/elixir/get_started/04-routing-over-transport-initiator.exs index 6eb69e9e75f..8381707b267 100644 --- a/examples/elixir/get_started/04-routing-over-transport-initiator.exs +++ b/examples/elixir/get_started/04-routing-over-transport-initiator.exs @@ -1,7 +1,7 @@ ["setup.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Start the TCP Transport Add-on for Ockam Routing. Ockam.Transport.TCP.start() diff --git a/examples/elixir/get_started/05-routing-over-transport-two-hops-initiator.exs b/examples/elixir/get_started/05-routing-over-transport-two-hops-initiator.exs index d7493ae5866..ddfd11615b0 100644 --- a/examples/elixir/get_started/05-routing-over-transport-two-hops-initiator.exs +++ b/examples/elixir/get_started/05-routing-over-transport-two-hops-initiator.exs @@ -1,7 +1,7 @@ ["setup.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Start the TCP Transport Add-on for Ockam Routing. Ockam.Transport.TCP.start() diff --git a/examples/elixir/get_started/06-secure-channel-over-two-transport-hops-initiator.exs b/examples/elixir/get_started/06-secure-channel-over-two-transport-hops-initiator.exs index 0af2360b40e..ba5e082f79e 100644 --- a/examples/elixir/get_started/06-secure-channel-over-two-transport-hops-initiator.exs +++ b/examples/elixir/get_started/06-secure-channel-over-two-transport-hops-initiator.exs @@ -1,7 +1,7 @@ ["setup.exs", "waiter.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Start the TCP Transport Add-on for Ockam Routing. Ockam.Transport.TCP.start() diff --git a/examples/elixir/get_started/07-routing-via-remote-forwarder-initiator.ex b/examples/elixir/get_started/07-routing-via-remote-forwarder-initiator.ex index 921374e22b0..b00668be190 100644 --- a/examples/elixir/get_started/07-routing-via-remote-forwarder-initiator.ex +++ b/examples/elixir/get_started/07-routing-via-remote-forwarder-initiator.ex @@ -1,7 +1,7 @@ ["setup.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Start the TCP Transport Add-on for Ockam Routing. Ockam.Transport.TCP.start() diff --git a/examples/elixir/get_started/07-routing-via-remote-forwarder-responder.exs b/examples/elixir/get_started/07-routing-via-remote-forwarder-responder.exs index ea47d90f942..1fb39a7a6f2 100644 --- a/examples/elixir/get_started/07-routing-via-remote-forwarder-responder.exs +++ b/examples/elixir/get_started/07-routing-via-remote-forwarder-responder.exs @@ -1,7 +1,7 @@ ["setup.exs", "echoer.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Create a Echoer type worker at address "echoer". {:ok, _echoer} = Echoer.create(address: "echoer") diff --git a/examples/elixir/get_started/08-secure-channel-over-remote-forwarder-initiator.ex b/examples/elixir/get_started/08-secure-channel-over-remote-forwarder-initiator.ex index 909c0e05b19..847b4e8c243 100644 --- a/examples/elixir/get_started/08-secure-channel-over-remote-forwarder-initiator.ex +++ b/examples/elixir/get_started/08-secure-channel-over-remote-forwarder-initiator.ex @@ -1,7 +1,7 @@ ["setup.exs", "waiter.exs"] |> Enum.map(&Code.require_file/1) # Register this process as worker address "app". -Ockam.Node.register_address("app", self()) +Ockam.Node.register_address("app") # Start the TCP Transport Add-on for Ockam Routing. Ockam.Transport.TCP.start() diff --git a/implementations/elixir/ockam/ockam/lib/ockam/asymmetric_worker.ex b/implementations/elixir/ockam/ockam/lib/ockam/asymmetric_worker.ex index 2095811177c..259e3b116df 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/asymmetric_worker.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/asymmetric_worker.ex @@ -83,12 +83,12 @@ defmodule Ockam.AsymmetricWorker do def register_inner_address(options, state) do case Keyword.get(options, :inner_address) do nil -> - Ockam.Node.register_random_address() + Ockam.Node.register_random_address(address_prefix(options), __MODULE__) inner_address -> - case Ockam.Node.register_address(inner_address) do - :yes -> {:ok, inner_address} - :no -> {:error, :inner_address_already_taken} + case Ockam.Node.register_address(inner_address, __MODULE__) do + :ok -> {:ok, inner_address} + {:error, _reason} -> {:error, :inner_address_already_taken} end end end diff --git a/implementations/elixir/ockam/ockam/lib/ockam/examples/forwarding/forwarding.ex b/implementations/elixir/ockam/ockam/lib/ockam/examples/forwarding/forwarding.ex index e29ff8848ee..5c3bd535dc9 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/examples/forwarding/forwarding.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/examples/forwarding/forwarding.ex @@ -44,7 +44,7 @@ defmodule Ockam.Examples.Forwarding do TCP.start() forwarding_route = [@hub_address, "forwarding_service"] - Ockam.Node.register_address("example_responder", self()) + Ockam.Node.register_address("example_responder") with {:ok, forwarder_address} <- ServiceApi.register_self(forwarding_route, "example_responder") do @@ -63,7 +63,7 @@ defmodule Ockam.Examples.Forwarding do TCP.start() forwarder_route = [@hub_address, forwarder_address] - Ockam.Node.register_address("example_initiator", self()) + Ockam.Node.register_address("example_initiator") Ockam.Router.route(%{ onward_route: forwarder_route, diff --git a/implementations/elixir/ockam/ockam/lib/ockam/examples/routing/local.ex b/implementations/elixir/ockam/ockam/lib/ockam/examples/routing/local.ex index fa6b42cc7ee..5c2942854fe 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/examples/routing/local.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/examples/routing/local.ex @@ -16,7 +16,7 @@ defmodule Ockam.Examples.Routing.Local do ## Register this process to receive messages my_address = "example_run" - Ockam.Node.register_address(my_address, self()) + Ockam.Node.register_address(my_address) message = %{ ## Route message through hop to echoer diff --git a/implementations/elixir/ockam/ockam/lib/ockam/examples/routing/tcp.ex b/implementations/elixir/ockam/ockam/lib/ockam/examples/routing/tcp.ex index 10bdfd060bc..ff15f844001 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/examples/routing/tcp.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/examples/routing/tcp.ex @@ -30,7 +30,7 @@ defmodule Ockam.Examples.Routing.TCP do ## Register this process to receive messages my_address = "example_run" - Ockam.Node.register_address(my_address, self()) + Ockam.Node.register_address(my_address) Ockam.Router.route(%{ onward_route: [server_host_address, "echoer"], diff --git a/implementations/elixir/ockam/ockam/lib/ockam/examples/secure_channel/local.ex b/implementations/elixir/ockam/ockam/lib/ockam/examples/secure_channel/local.ex index 5254d599662..e1ab63a450f 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/examples/secure_channel/local.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/examples/secure_channel/local.ex @@ -34,7 +34,7 @@ defmodule Ockam.Examples.SecureChannel.Local do ## Register this process to receive messages my_address = "example_run" - Ockam.Node.register_address(my_address, self()) + Ockam.Node.register_address(my_address) send_and_wait(channel, "Hello secure channel!", my_address) diff --git a/implementations/elixir/ockam/ockam/lib/ockam/examples/secure_channel/tcp.ex b/implementations/elixir/ockam/ockam/lib/ockam/examples/secure_channel/tcp.ex index 7cc8c3fe9e5..8247b9b9427 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/examples/secure_channel/tcp.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/examples/secure_channel/tcp.ex @@ -44,7 +44,7 @@ defmodule Ockam.Examples.SecureChannel.TCP do ## Register this process to receive messages my_address = "example_run" - Ockam.Node.register_address(my_address, self()) + Ockam.Node.register_address(my_address) send_and_wait(channel, "Hello secure channel over TCP!", my_address) diff --git a/implementations/elixir/ockam/ockam/lib/ockam/message.ex b/implementations/elixir/ockam/ockam/lib/ockam/message.ex index a146e3711ef..469111a8cc9 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/message.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/message.ex @@ -68,10 +68,14 @@ defmodule Ockam.Message do end @doc "Get onward_route from the message" - def onward_route(%Ockam.Message{onward_route: onward_route}), do: onward_route + def onward_route(%Ockam.Message{onward_route: onward_route}) when is_list(onward_route), + do: onward_route @doc "Get return_route from the message" - def return_route(%Ockam.Message{return_route: return_route}), do: return_route + def return_route(%Ockam.Message{return_route: return_route}) when is_list(return_route), + do: return_route + + def return_route(%Ockam.Message{return_route: nil}), do: [] @doc "Get payload from the message" def payload(%Ockam.Message{payload: payload}), do: payload diff --git a/implementations/elixir/ockam/ockam/lib/ockam/node.ex b/implementations/elixir/ockam/ockam/lib/ockam/node.ex index 31e8cfdd9e1..64b690e4854 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/node.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/node.ex @@ -33,46 +33,101 @@ defmodule Ockam.Node do end end - def register_address(address) do - Registry.register_name(address, self()) + @spec register_address(any(), module()) :: :ok | {:error, any()} + @doc """ + Registers the address of the current process with optional module name + """ + def register_address(address, module \\ nil) do + Registry.register(address, module) end + @spec set_address_module(any(), module()) :: :ok | :error @doc """ - Registers the address of a `pid`. + Sets module name for already registered process """ - defdelegate register_address(address, pid), to: Registry, as: :register_name + def set_address_module(address, module) do + Registry.set_module(address, module) + end + @spec unregister_address(any()) :: :ok @doc """ Unregisters an address. """ defdelegate unregister_address(address), to: Registry, as: :unregister_name + @spec list_addresses() :: [address :: any()] @doc """ Lists all registered addresses """ defdelegate list_addresses(), to: Registry, as: :list_names + @spec list_workers() :: [{address :: any(), pid(), module()}] + @doc """ + Lists all workers with their primary address, worker pid and module + """ + ## TODO: currently taking just one random address per pid, make sure it's primary + def list_workers() do + list_addresses() + |> Enum.flat_map(fn address -> + case Registry.lookup(address) do + {:ok, pid, module} -> [{address, pid, module}] + :error -> [] + end + end) + |> Enum.uniq_by(fn {_address, pid, _module} -> pid end) + end + @doc """ Send a message to the process registered with an address. """ def send(address, %Ockam.Message{} = message) do case Registry.whereis_name(address) do # dead letters - :undefined -> :ok - _pid -> Registry.send(address, message) + :undefined -> + report_message(:unsent, address, message) + :ok + + _pid -> + Registry.send(address, message) + report_message(:sent, address, message) + :ok end end - def register_random_address(prefix \\ "", length_in_bytes \\ @default_address_length_in_bytes) do + @spec report_message(:sent | :unsent, any(), Ockam.Message.t()) :: :ok + def report_message(sent_status, address, message) do + from = Enum.at(Message.return_route(message), 0) + + metadata = %{from: from, to: address, message: message} + + Telemetry.emit_event([__MODULE__, :message, sent_status], + measurements: %{count: 1}, + metadata: metadata + ) + end + + @spec register_random_address(prefix :: String.t(), module(), length_in_bytes :: integer()) :: + {:ok, address :: any()} | {:error, any()} + @doc """ + Registers random address of certain length using set prefix and module name + """ + ## TODO: make address actually fit into length in bytes + def register_random_address( + prefix \\ "", + module \\ nil, + length_in_bytes \\ @default_address_length_in_bytes + ) do address = get_random_unregistered_address(prefix, length_in_bytes) - case register_address(address) do - :yes -> {:ok, address} + case register_address(address, module) do + :ok -> {:ok, address} ## TODO: recursion limit - :no -> register_random_address(prefix, length_in_bytes) + {:error, _reason} -> register_random_address(prefix, module, length_in_bytes) end end + @spec get_random_unregistered_address(prefix :: String.t(), length_in_bytes :: integer()) :: + binary() @doc """ Returns a random address that is currently not registed on the node. """ @@ -149,27 +204,16 @@ defmodule Ockam.Node do end def handle_local_message(%Ockam.Message{} = message) do - metadata = %{message: message} - - start_time = - Telemetry.emit_start_event([__MODULE__, :handle_local_message], metadata: metadata) - - return_value = - case Message.onward_route(message) do - [] -> - # Logger.warn("Routing message with no onward_route: #{inspect(message)}") - :ok - - [first | _rest] -> - 0 = Address.type(first) - local_address = Address.value(first) - __MODULE__.send(local_address, message) - end - - metadata = Map.put(metadata, :return_value, return_value) - - Telemetry.emit_stop_event([__MODULE__, :handle_local_message], start_time, metadata: metadata) + case Message.onward_route(message) do + [] -> + report_message(:unsent, nil, message) + # Logger.warn("Routing message with no onward_route: #{inspect(message)}") + :ok - return_value + [first | _rest] -> + 0 = Address.type(first) + local_address = Address.value(first) + __MODULE__.send(local_address, message) + end end end diff --git a/implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex b/implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex index 7c3fb80910b..529c10d9cbe 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex @@ -44,6 +44,9 @@ defmodule Ockam.Node.Registry do # See the "Name registration" section of the `GenServer` module. def unregister_name(address), do: Registry.unregister_name({__MODULE__, address}) + @doc false + def update_value(address, value), do: Registry.update_value(__MODULE__, address, value) + @spec addresses(pid) :: [any] def addresses(pid), do: Registry.keys(__MODULE__, pid) @@ -59,4 +62,38 @@ defmodule Ockam.Node.Registry do List all registered worker names """ def list_names(), do: Registry.select(__MODULE__, [{{:"$1", :_, :_}, [], [:"$1"]}]) + + @spec register(any(), any()) :: :ok | {:error, reason :: any()} + @doc false + # This function is used in custom process registration + # + # Module should be the worker implementation module + def register(address, module) do + case Registry.register(__MODULE__, address, module) do + {:ok, _owner} -> :ok + {:error, reason} -> {:error, reason} + end + end + + @spec set_module(any(), module()) :: :ok | :error + @doc false + # Set worker module for the current process + # + # This function is called from the worker behaviour + # Module is not set when registering with register_name from `:via` option + # so this function needs to be called to set it after the process is created + def set_module(address, module) do + case Registry.update_value(__MODULE__, address, fn _old -> module end) do + :error -> :error + {_new, _old} -> :ok + end + end + + @spec lookup(address :: any()) :: {:ok, pid, module} | :error + def lookup(address) do + case Registry.lookup(__MODULE__, address) do + [{pid, module}] -> {:ok, pid, module} + [] -> :error + end + end end diff --git a/implementations/elixir/ockam/ockam/lib/ockam/router.ex b/implementations/elixir/ockam/ockam/lib/ockam/router.ex index 40cb6faf051..e5ac0e68c1a 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/router.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/router.ex @@ -84,6 +84,7 @@ defmodule Ockam.Router do defp invoke_handler(handler, message) do case apply_handler(handler, message) do {:error, error} -> {:error, {:handler_error, error, message, handler}} + ## TODO: require and match :ok result _anything_else -> :ok end end diff --git a/implementations/elixir/ockam/ockam/lib/ockam/secure_channel/channel.ex b/implementations/elixir/ockam/ockam/lib/ockam/secure_channel/channel.ex index 6c851597cfd..67a55a680d4 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/secure_channel/channel.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/secure_channel/channel.ex @@ -113,7 +113,7 @@ defmodule Ockam.SecureChannel.Channel do address_prefix = Keyword.get(options, :address_prefix, "") ciphertext_address = Node.get_random_unregistered_address(address_prefix) - with :yes <- Node.register_address(ciphertext_address, self()) do + with :ok <- Node.register_address(ciphertext_address, __MODULE__) do {:ok, Map.put(data, :ciphertext_address, ciphertext_address)} end end diff --git a/implementations/elixir/ockam/ockam/lib/ockam/telemetry.ex b/implementations/elixir/ockam/ockam/lib/ockam/telemetry.ex index eefcbd0497e..1f97de4220a 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/telemetry.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/telemetry.ex @@ -49,6 +49,11 @@ if Code.ensure_loaded?(:telemetry) do measurements = Keyword.get(options, :measurements, %{}) metadata = Keyword.get(options, :metadata, %{}) + emit_event(event_name, measurements, metadata) + end + + def emit_event([first | _rest] = event_name, measurements, metadata) + when is_list(event_name) and is_atom(first) and is_map(measurements) and is_map(metadata) do :ok = :telemetry.execute([:ockam] ++ event_name, measurements, metadata) end @@ -89,7 +94,7 @@ if Code.ensure_loaded?(:telemetry) do measurements = Map.merge(measurements, %{system_time: System.system_time()}) event_name = Enum.reverse([:start | Enum.reverse(event_name)]) - :ok = :telemetry.execute([:ockam] ++ event_name, measurements, metadata) + :ok = emit_event(event_name, measurements, metadata) start_time end @@ -133,7 +138,7 @@ if Code.ensure_loaded?(:telemetry) do measurements = Map.merge(measurements, %{duration: end_time - start_time}) event_name = Enum.reverse([:stop | Enum.reverse(event_name)]) - :ok = :telemetry.execute([:ockam] ++ event_name, measurements, metadata) + emit_event(event_name, measurements, metadata) end @doc """ @@ -200,7 +205,7 @@ if Code.ensure_loaded?(:telemetry) do measurements = Map.merge(measurements, %{duration: end_time - start_time}) event_name = Enum.reverse([:exception | Enum.reverse(event_name)]) - :ok = :telemetry.execute([:ockam] ++ event_name, measurements, metadata) + emit_event(event_name, measurements, metadata) end end else diff --git a/implementations/elixir/ockam/ockam/lib/ockam/transport/tcp/handler.ex b/implementations/elixir/ockam/ockam/lib/ockam/transport/tcp/handler.ex index b197004ed7d..b045175ede2 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/transport/tcp/handler.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/transport/tcp/handler.ex @@ -24,9 +24,7 @@ defmodule Ockam.Transport.TCP.Handler do {:ok, socket} = :ranch.handshake(ref, opts) :ok = :inet.setopts(socket, [{:active, true}, {:packet, 2}, {:nodelay, true}]) - address = Ockam.Node.get_random_unregistered_address(@address_prefix) - - Ockam.Node.Registry.register_name(address, self()) + {:ok, address} = Ockam.Node.register_random_address(@address_prefix, __MODULE__) {function_name, _} = __ENV__.function Telemetry.emit_event(function_name) diff --git a/implementations/elixir/ockam/ockam/lib/ockam/worker.ex b/implementations/elixir/ockam/ockam/lib/ockam/worker.ex index cefaef3f17d..4c6334164d8 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/worker.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/worker.ex @@ -1,6 +1,8 @@ defmodule Ockam.Worker do @moduledoc false + alias Ockam.Telemetry + @callback setup(options :: Keyword.t(), initial_state :: map()) :: {:ok, state :: map()} | {:error, reason :: any()} @@ -51,7 +53,6 @@ defmodule Ockam.Worker do alias Ockam.Node alias Ockam.Router - alias Ockam.Telemetry def create(options \\ []) when is_list(options) do address_prefix = Keyword.get(options, :address_prefix, address_prefix(options)) @@ -97,14 +98,15 @@ defmodule Ockam.Worker do @doc false @impl true def handle_info(%Ockam.Message{} = message, state) do - metadata = %{message: message} - start_time = Telemetry.emit_start_event([__MODULE__, :handle_message], metadata: metadata) + ## TODO: improve metadata + metadata = %{message: message, address: Map.get(state, :address), module: __MODULE__} + + start_time = Ockam.Worker.emit_handle_message_start(metadata) ## TODO: error handling return_value = handle_message(message, state) - metadata = Map.put(metadata, :return_value, return_value) - Telemetry.emit_stop_event([__MODULE__, :handle_message], start_time, metadata: metadata) + Ockam.Worker.emit_handle_message_stop(metadata, start_time, return_value) last_message_ts = System.os_time(:millisecond) @@ -124,15 +126,21 @@ defmodule Ockam.Worker do @doc false @impl true def handle_continue(:post_init, options) do - metadata = %{address: Keyword.get(options, :address)} - start_time = Telemetry.emit_start_event([__MODULE__, :init], metadata: metadata) + Node.set_address_module(Keyword.fetch!(options, :address), __MODULE__) with {:ok, address} <- get_from_options(:address, options) do + metadata = %{ + address: Keyword.get(options, :address), + options: options, + module: __MODULE__ + } + + start_time = Ockam.Worker.emit_init_start(metadata) + base_state = %{address: address, module: __MODULE__, last_message_ts: nil} return_value = setup(options, base_state) - metadata = Map.put(metadata, :return_value, return_value) - Telemetry.emit_stop_event([__MODULE__, :init], start_time, metadata: metadata) + Ockam.Worker.emit_init_stop(metadata, start_time, return_value) case return_value do {:ok, state} -> @@ -161,4 +169,59 @@ defmodule Ockam.Worker do defoverridable setup: 2, address_prefix: 1 end end + + ## Metrics functions + + def emit_handle_message_start(metadata) do + start_time = + Telemetry.emit_start_event([Map.get(metadata, :module), :handle_message], metadata: metadata) + + Telemetry.emit_event([Ockam.Worker, :handle_message, :start], + metadata: metadata, + measurements: %{system_time: System.system_time()} + ) + + start_time + end + + def emit_handle_message_stop(metadata, start_time, return_value) do + result = + case return_value do + {:ok, _result} -> :ok + {:stop, _reason, _state} -> :stop + {:error, _reason} -> :error + end + + metadata = Map.merge(metadata, %{result: result, return_value: return_value}) + + Telemetry.emit_stop_event([Map.get(metadata, :module), :handle_message], start_time, + metadata: metadata + ) + + Telemetry.emit_stop_event([Ockam.Worker, :handle_message], start_time, metadata: metadata) + end + + def emit_init_start(metadata) do + start_time = + Telemetry.emit_start_event([Map.get(metadata, :module), :init], metadata: metadata) + + Telemetry.emit_event([Ockam.Worker, :init, :start], + metadata: metadata, + measurements: %{system_time: System.system_time()} + ) + + start_time + end + + def emit_init_stop(metadata, start_time, return_value) do + result = + case return_value do + {:ok, _state} -> :ok + {:error, _reason} -> :error + end + + metadata = Map.merge(metadata, %{result: result, return_value: return_value}) + Telemetry.emit_stop_event([Map.get(metadata, :module), :init], start_time, metadata: metadata) + Telemetry.emit_stop_event([Ockam.Worker, :init], start_time, metadata: metadata) + end end diff --git a/implementations/elixir/ockam/ockam/test/ockam/node_test.exs b/implementations/elixir/ockam/ockam/test/ockam/node_test.exs index 3dfc5217d82..7ee0bc5b235 100644 --- a/implementations/elixir/ockam/ockam/test/ockam/node_test.exs +++ b/implementations/elixir/ockam/ockam/test/ockam/node_test.exs @@ -5,7 +5,7 @@ defmodule Ockam.Node.Tests do describe "#{Node}.register/2" do test "can register, send, unregister" do - Node.register_address("A", self()) + Node.register_address("A") Node.send("A", %Ockam.Message{payload: "hello"}) assert_receive %Ockam.Message{payload: "hello"} Node.unregister_address("A") @@ -16,7 +16,7 @@ defmodule Ockam.Node.Tests do test "keeps trying" do Enum.each(0..254, fn x -> x = Base.encode16(<>, case: :lower) - Node.register_address(x, self()) + Node.register_address(x) end) assert "ff" === Node.get_random_unregistered_address("", 1) diff --git a/implementations/elixir/ockam/ockam/test/ockam/secure_channel_test.exs b/implementations/elixir/ockam/ockam/test/ockam/secure_channel_test.exs index 1ba1591ec09..ae968ddf2b3 100644 --- a/implementations/elixir/ockam/ockam/test/ockam/secure_channel_test.exs +++ b/implementations/elixir/ockam/ockam/test/ockam/secure_channel_test.exs @@ -32,7 +32,7 @@ defmodule Ockam.SecureChannel.Tests do alias Ockam.Vault.Software, as: SoftwareVault setup do - Node.register_address("test", self()) + Node.register_address("test") on_exit(fn -> Node.unregister_address("test") end) end diff --git a/implementations/elixir/ockam/ockam_hub/config/runtime.exs b/implementations/elixir/ockam/ockam_hub/config/runtime.exs index d50f451d654..7a43584e3fc 100644 --- a/implementations/elixir/ockam/ockam_hub/config/runtime.exs +++ b/implementations/elixir/ockam/ockam_hub/config/runtime.exs @@ -46,49 +46,7 @@ config :ockam_hub, :postgres, password: System.get_env("POSTGRES_PASSWORD"), database: System.get_env("POSTGRES_DATABASE") -ui_auth_message = - with true <- File.exists?("/mnt/secrets/auth/message"), - {:ok, contents} <- File.read("/mnt/secrets/auth/message"), - client_secret <- String.trim(contents) do - client_secret - else - false -> - System.get_env("AUTH_MESSAGE") || "devsecret" - - {:error, :enoent} -> - System.get_env("AUTH_MESSAGE") || "devsecret" - end - -ui_auth_host = - with true <- File.exists?("/mnt/secrets/auth/host"), - {:ok, contents} <- File.read("/mnt/secrets/auth/host"), - client_secret <- String.trim(contents) do - client_secret - else - false -> - System.get_env("AUTH_HOST") || "http://localhost:4002" - - {:error, :enoent} -> - System.get_env("AUTH_HOST") || "http://localhost:4002" - end - -node_fqdn = - case System.get_env("NODE_FQDN") do - fqdn when is_binary(fqdn) and fqdn != "" -> - fqdn - - _ -> - case config_env() do - :dev -> "localhost" - :test -> "localhost" - _ -> nil - end - end - config :ockam_hub, - auth_message: ui_auth_message, - auth_host: ui_auth_host, - node_fqdn: node_fqdn, tcp_transport_port: 4000, udp_transport_port: 7000 diff --git a/implementations/elixir/ockam/ockam_hub/lib/hub.ex b/implementations/elixir/ockam/ockam_hub/lib/hub.ex index 1b10ad3d0db..6435bf61ba8 100644 --- a/implementations/elixir/ockam/ockam_hub/lib/hub.ex +++ b/implementations/elixir/ockam/ockam_hub/lib/hub.ex @@ -35,7 +35,11 @@ defmodule Ockam.Hub do { :telemetry_poller, [ - period: :timer.seconds(5) + period: :timer.seconds(30), + measurements: [ + {Ockam.Hub.TelemetryPoller, :dispatch_worker_count, []}, + {Ockam.Hub.TelemetryPoller, :dispatch_tcp_connections, []} + ] ] }, # Add a TCP listener @@ -47,14 +51,12 @@ defmodule Ockam.Hub do ]} ] ++ services_specs ++ - schedule_specs - - children = - if Application.get_env(:telemetry_influxdb, :host, nil) do - [influxdb_telemetry_config() | children] - else - children - end + schedule_specs ++ + if Application.get_env(:telemetry_influxdb, :host, nil) do + [Ockam.Hub.Metrics.TelemetryInfluxDB.child_spec()] + else + [] + end # Start a supervisor with the given children. The supervisor will inturn # start the given children. @@ -98,100 +100,4 @@ defmodule Ockam.Hub do [] end end - - defp influxdb_telemetry_config() do - %{ - id: TelemetryInfluxDB, - start: { - TelemetryInfluxDB, - :start_link, - [ - [ - version: :v2, - protocol: :http, - reporter_name: "Ockam Hub", - host: Application.get_env(:telemetry_influxdb, :host) || "http://127.0.0.1", - port: String.to_integer(Application.get_env(:telemetry_influxdb, :port) || "8086"), - bucket: Application.get_env(:telemetry_influxdb, :bucket) || "ockam_hub", - org: Application.get_env(:telemetry_influxdb, :org) || "ockam", - token: Application.get_env(:telemetry_influxdb, :token) || "TOKEN NOT CONFIGURED", - tags: %{hostname: System.get_env("HOSTNAME", "none")}, - events: [ - %{ - name: [:vm, :memory], - metadata_tag_keys: [ - :total, - :processes, - :processes_used, - :system, - :atom, - :atom_used, - :binary, - :code, - :ets, - :maximum - ] - }, - %{ - name: [:vm, :total_run_queue_lengths], - metadata_tag_keys: [:total, :cpu, :io] - }, - %{ - name: [:vm, :system_counts], - metadata_tag_keys: [:process_count, :atom_count, :port_count] - }, - %{ - name: [:ockam, Ockam.Transport.TCP.Listener, :init, :start], - metadata_tag_keys: [:options, :return_value] - }, - %{ - name: [:ockam, Ockam.Hub.Service.Echo, :init, :start], - metadata_tag_keys: [:options, :return_value] - }, - %{ - name: [:ockam, Ockam.Hub.Service.Alias, :init, :start], - metadata_tag_keys: [:options, :return_value] - }, - %{ - name: [:ockam, Ockam.Hub.Service.Alias.Forwarder, :init, :start], - metadata_tag_keys: [:options, :return_value] - }, - %{ - name: [:ockam, Ockam.Router, :route, :start], - metadata_tag_keys: [:message, :return_value] - }, - %{ - name: [:ockam, Ockam.Router, :route, :start_link], - metadata_tag_keys: [:options, :return_value] - }, - %{ - name: [:ockam, Ockam.Transport.TCP.Listener, :handle_message, :start], - metadata_tag_keys: [:message, :return_value] - }, - %{ - name: [:ockam, Ockam.Transport.UDP.Listener, :handle_message, :start], - metadata_tag_keys: [:message, :return_value] - }, - %{ - name: [:ockam, Ockam.Node, :handle_local_message, :start], - metadata_tag_keys: [:message, :return_value] - }, - %{ - name: [:ockam, Ockam.Hub.Service.Echo, :handle_message, :start], - metadata_tag_keys: [:message, :onward_route, :return_route, :version] - }, - %{ - name: [:ockam, Ockam.Hub.Service.Alias, :handle_message, :start], - metadata_tag_keys: [:message, :return_value] - }, - %{ - name: [:ockam, Ockam.Hub.Service.Alias.Forwarder, :handle_message, :start], - metadata_tag_keys: [:message, :return_value] - } - ] - ] - ] - } - } - end end diff --git a/implementations/elixir/ockam/ockam_hub/lib/hub/metrics/telemetry_influxdb.ex b/implementations/elixir/ockam/ockam_hub/lib/hub/metrics/telemetry_influxdb.ex new file mode 100644 index 00000000000..86f8e3cf5b9 --- /dev/null +++ b/implementations/elixir/ockam/ockam_hub/lib/hub/metrics/telemetry_influxdb.ex @@ -0,0 +1,87 @@ +defmodule Ockam.Hub.Metrics.TelemetryInfluxDB do + @moduledoc """ + Telemetry InfluxDB metric reporting configuration + """ + def child_spec() do + %{ + id: TelemetryInfluxDB, + start: { + TelemetryInfluxDB, + :start_link, + [ + [ + version: :v2, + protocol: :http, + reporter_name: "Ockam Hub", + host: Application.get_env(:telemetry_influxdb, :host) || "http://127.0.0.1", + port: String.to_integer(Application.get_env(:telemetry_influxdb, :port) || "8086"), + bucket: Application.get_env(:telemetry_influxdb, :bucket) || "ockam_hub", + org: Application.get_env(:telemetry_influxdb, :org) || "ockam", + token: Application.get_env(:telemetry_influxdb, :token) || "TOKEN NOT CONFIGURED", + tags: %{ + hostname: System.get_env("HOSTNAME", "none"), + namespace: System.get_env("METRICS_NAMESPACE", "none") + }, + events: [ + %{ + name: [:vm, :memory], + metadata_tag_keys: [ + :total, + :processes, + :processes_used, + :system, + :atom, + :atom_used, + :binary, + :code, + :ets, + :maximum + ] + }, + %{ + name: [:ockam, :workers, :type], + metadata_tag_keys: [:type] + }, + %{ + name: [:ockam, :tcp, :connections], + metadata_tag_keys: [:port] + }, + %{ + name: [:vm, :total_run_queue_lengths], + metadata_tag_keys: [:total, :cpu, :io] + }, + %{ + name: [:vm, :system_counts], + metadata_tag_keys: [:process_count, :atom_count, :port_count] + }, + %{ + name: [:ockam, Ockam.Node, :message, :sent], + metadata_tag_keys: [:from, :to] + }, + %{ + name: [:ockam, Ockam.Node, :message, :unsent], + metadata_tag_keys: [:from, :to] + }, + %{ + name: [:ockam, Ockam.Worker, :handle_message, :start], + metadata_tag_keys: [:address, :module] + }, + %{ + name: [:ockam, Ockam.Worker, :handle_message, :stop], + metadata_tag_keys: [:address, :module, :result] + }, + %{ + name: [:ockam, Ockam.Worker, :init, :start], + metadata_tag_keys: [:address, :module] + }, + %{ + name: [:ockam, Ockam.Worker, :init, :stop], + metadata_tag_keys: [:address, :module, :result] + } + ] + ] + ] + } + } + end +end diff --git a/implementations/elixir/ockam/ockam_hub/lib/hub/telemetry_forwarder.ex b/implementations/elixir/ockam/ockam_hub/lib/hub/telemetry_forwarder.ex deleted file mode 100644 index 2e2a02fddcc..00000000000 --- a/implementations/elixir/ockam/ockam_hub/lib/hub/telemetry_forwarder.ex +++ /dev/null @@ -1,119 +0,0 @@ -defmodule Ockam.Hub.TelemetryForwarder do - @moduledoc false - - require Logger - - @spec forward(any, [atom, ...], any, any) :: :ok | {:error, :already_exists} - def forward(handler_name, event_name, node_name, process_name) do - handler = fn ev, mes, met, opt -> - send({process_name, node_name}, {:telemetry, {ev, mes, met, opt}}) - end - - :telemetry.attach(handler_name, event_name, handler, nil) - end - - def init() do - settings = get_settings() - - create_node(settings.host, settings.token, settings.node_fqdn) - - attach_send_to_ui(settings.host, settings.token, settings.node_fqdn) - end - - def get_settings() do - token = Application.get_env(:ockam_hub, :auth_message) - host = Application.get_env(:ockam_hub, :auth_host) - - node_fqdn = Application.get_env(:ockam_hub, :node_fqdn) - - %{ - host: host, - token: token, - node_fqdn: node_fqdn - } - end - - def attach_send_to_ui() do - settings = get_settings() - - attach_send_to_ui(settings.host, settings.token, settings.node_fqdn) - end - - @spec attach_send_to_ui(any, any, any) :: :ok | {:error, :already_exists} - def attach_send_to_ui(host, token, _node_fqdn) do - event_name = [:ockam, Ockam.Node, :handle_local_message, :start] - - handler = fn _event, _message, metadata, _options -> - # 2. get message from metadata - # 3. format message for JSON - # 4. set hostname and query string from secrets - # payload = """ - # {"message": {"version": 1,"onward_route": ["a","b","c"],"return_route": ["1","2","3"],"payload": "asdf"}} - # """ - metadata = - case metadata do - %{message: %{payload: payload} = message} -> - %{metadata | message: %{message | payload: Base.encode64(payload)}} - - other -> - other - end - - json_payload = Jason.encode!(metadata) - token = URI.encode_www_form(token) - - HTTPoison.post("#{host}/messages?token=#{token}", json_payload, [ - {"Content-Type", "application/json"} - ]) - end - - :telemetry.detach(:send_to_ui) - :telemetry.attach(:send_to_ui, event_name, handler, nil) - end - - def create_node() do - settings = get_settings() - - create_node(settings.host, settings.token, settings.node_fqdn) - end - - @spec create_node(any, any, any) :: :ok - def create_node(host, token, node_fqdn) do - payload = - Jason.encode!(%{ - node: %{ - hostname: node_fqdn, - port: 4000 - } - }) - - token = URI.encode_www_form(token) - - case HTTPoison.post("#{host}/nodes?token=#{token}", payload, [ - {"Content-Type", "application/json"} - ]) do - {:ok, %{status_code: 204}} -> - :ok - - {:ok, %{status_code: 422}} -> - Logger.info("Node already created in UI") - :ok - - {:ok, %{status_code: code}} -> - Logger.info("UI responds with code #{inspect(code)}") - :ok - - {:error, %HTTPoison.Error{reason: :econnrefused}} -> - Logger.error("connection refused trying to create a node") - :ok - - {:error, %HTTPoison.Error{reason: reason}} -> - Logger.error("UI request error #{inspect(reason)}") - :ok - end - - # we don't care if this fails, - # we'll see it in the nginx log - :ok - end -end diff --git a/implementations/elixir/ockam/ockam_hub/lib/hub/telemetry_poller.ex b/implementations/elixir/ockam/ockam_hub/lib/hub/telemetry_poller.ex new file mode 100644 index 00000000000..44be33aa5a2 --- /dev/null +++ b/implementations/elixir/ockam/ockam_hub/lib/hub/telemetry_poller.ex @@ -0,0 +1,49 @@ +defmodule Ockam.Hub.TelemetryPoller do + @moduledoc """ + Telemetry poller callbacks to collect information about workers and transports + """ + + ## TODO: maybe move this to `ockam` app? + + alias Ockam.Telemetry + + def dispatch_worker_count() do + Enum.map(workers_by_type(), fn {type, workers} -> + type_str = format_worker_type(type) + + Telemetry.emit_event([:workers, :type], + measurements: %{count: Enum.count(workers)}, + metadata: %{type: type_str} + ) + end) + end + + def dispatch_tcp_connections() do + Enum.map(:ranch.info(), fn {_ref, info} -> + connections = Map.get(info, :all_connections, []) + port = Map.get(info, :port, 0) + + Telemetry.emit_event([:tcp, :connections], + measurements: %{count: connections}, + metadata: %{port: port} + ) + end) + end + + defp format_worker_type(nil) do + "Other" + end + + defp format_worker_type(module) do + to_string(module) + end + + @spec workers_by_type() :: [{module(), address :: String.t()}] + def workers_by_type() do + Ockam.Node.list_workers() + |> Enum.group_by(fn {_address, _pid, module} -> module end, fn {address, _pid, _modules} -> + address + end) + |> Map.new() + end +end diff --git a/implementations/elixir/ockam/ockam_hub/test/hub/service/tracing_test.exs b/implementations/elixir/ockam/ockam_hub/test/hub/service/tracing_test.exs index f9a299ca374..8018b4a3325 100644 --- a/implementations/elixir/ockam/ockam_hub/test/hub/service/tracing_test.exs +++ b/implementations/elixir/ockam/ockam_hub/test/hub/service/tracing_test.exs @@ -4,7 +4,7 @@ defmodule Test.Hub.Service.TracingTest do test "trace payloads" do Ockam.Hub.Service.Tracing.create(address: "tracing_service") Ockam.Hub.Service.Echo.create(address: "echo_service") - Ockam.Node.register_address("TEST", self()) + Ockam.Node.register_address("TEST") Ockam.Router.route(%{ onward_route: ["tracing_service"],