Skip to content

Commit

Permalink
feat: use iodata for metrics export
Browse files Browse the repository at this point in the history
This prevents memory bloat form creating all intermediate binaries that
are later dropped. Instead we try to reuse as much of binaries as
possible and then let the VM do the concatenation if needed.
  • Loading branch information
hauleth committed Feb 3, 2025
1 parent 795f357 commit 2e5f303
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 73 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.1.6
2.1.7
36 changes: 7 additions & 29 deletions lib/supavisor/monitoring/prom_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ defmodule Supavisor.Monitoring.PromEx do
end
end

@spec get_metrics() :: String.t()
@spec get_metrics() :: iodata()
def get_metrics do
metrics_tags =
case Application.fetch_env(:supavisor, :metrics_tags) do
Expand All @@ -71,7 +71,7 @@ defmodule Supavisor.Monitoring.PromEx do
metrics =
PromEx.get_metrics(__MODULE__)
|> String.split("\n")
|> Enum.map_join("\n", &parse_and_add_tags(&1, def_tags))
|> Enum.map(&parse_and_add_tags(&1, def_tags))

Supavisor.Monitoring.PromEx.ETSCronFlusher
|> PromEx.ETSCronFlusher.defer_ets_flush()
Expand All @@ -81,7 +81,7 @@ defmodule Supavisor.Monitoring.PromEx do

@spec do_cache_tenants_metrics() :: list
def do_cache_tenants_metrics do
metrics = get_metrics() |> String.split("\n")
metrics = get_metrics() |> IO.iodata_to_binary() |> String.split("\n")

pools =
Registry.select(Supavisor.Registry.TenantClients, [{{:"$1", :_, :_}, [], [:"$1"]}])
Expand Down Expand Up @@ -109,43 +109,21 @@ defmodule Supavisor.Monitoring.PromEx do
end
end

@spec parse_and_add_tags(String.t(), String.t()) :: String.t()
@spec parse_and_add_tags(String.t(), String.t()) :: iodata()
defp parse_and_add_tags(line, def_tags) do
case Regex.run(~r/(?!\#)^(\w+)(?:{(.*?)})?\s*(.+)$/, line) do
nil ->
line
[line, "\n"]

[_, key, tags, value] ->
tags = clean_string(tags)

tags =
if tags == "" do
def_tags
else
"#{tags},#{def_tags}"
[tags, ",", def_tags]
end

"#{key}{#{tags}} #{value}"
[key, "{", tags, "}", value, "\n"]
end
end

@spec clean_string(String.t()) :: String.t()
def clean_string(metric_string) do
regex = ~r/=\s*"([^"]*?)"/

String.replace(metric_string, regex, fn match ->
[_, value] = Regex.run(regex, match)

cleaned =
value
|> String.replace(~r/\n+/, "")
|> String.trim()

if value != cleaned do
Logger.warning("Tag validation: #{inspect(value)} / #{inspect(cleaned)}")
end

"=\"#{cleaned}\""
end)
end
end
2 changes: 1 addition & 1 deletion lib/supavisor_web/controllers/metrics_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ defmodule SupavisorWeb.MetricsController do
end

def merge_node_metrics({_, {_node, metrics}}, acc) do
[metrics <> "\n" | acc]
[metrics, "\n" | acc]
end
end
6 changes: 3 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ defmodule Supavisor.MixProject do
{:phoenix_live_view, "~> 1.0"},
{:phoenix_live_dashboard, "~> 0.7"},
{:telemetry_poller, "~> 1.0"},
{:peep, "~> 3.1",
github: "hauleth/peep", ref: "94dd35d2c98a858ef20983f2b3da2ab112a07238", override: true},
{:peep, "~> 3.4"},
{:jason, "~> 1.2"},
{:plug_cowboy, "~> 2.5"},
{:joken, "~> 2.6.0"},
Expand Down Expand Up @@ -83,7 +82,8 @@ defmodule Supavisor.MixProject do

# Test utilities
{:excoveralls, ">= 0.0.0", only: [:dev, :test]},
{:stream_data, "~> 1.0", only: [:dev, :test]}
{:stream_data, "~> 1.0", only: [:dev, :test]},
{:req, "~> 0.5"}
]
end

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"octo_fetch": {:hex, :octo_fetch, "0.4.0", "074b5ecbc08be10b05b27e9db08bc20a3060142769436242702931c418695b19", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "cf8be6f40cd519d7000bb4e84adcf661c32e59369ca2827c4e20042eda7a7fc6"},
"open_api_spex": {:hex, :open_api_spex, "3.21.2", "6a704f3777761feeb5657340250d6d7332c545755116ca98f33d4b875777e1e5", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0 or ~> 6.0", [hex: :poison, repo: "hexpm", optional: true]}, {:ymlr, "~> 2.0 or ~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :ymlr, repo: "hexpm", optional: true]}], "hexpm", "f42ae6ed668b895ebba3e02773cfb4b41050df26f803f2ef634c72a7687dc387"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.4.0", "63ca1742f92f00059298f478048dfb826f4b20d49534493d6919a0db39b6db04", [:mix, :rebar3], [], "hexpm", "3dfbbfaa2c2ed3121c5c483162836c4f9027def469c41578af5ef32589fcfc58"},
"peep": {:git, "https://github.com/hauleth/peep.git", "94dd35d2c98a858ef20983f2b3da2ab112a07238", [ref: "94dd35d2c98a858ef20983f2b3da2ab112a07238"]},
"peep": {:hex, :peep, "3.4.2", "49d4ca116d994779351959dfee971fb2c7d6506f1821374f3cc4fd39a3d3fadb", [:mix], [{:nimble_options, "~> 1.1", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:plug, "~> 1.16", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry_metrics, "~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "cae224b09e224bf5584f5375108becf71e2288572a099a122e66735289cd33f4"},
"pg_types": {:hex, :pg_types, "0.4.0", "3ce365c92903c5bb59c0d56382d842c8c610c1b6f165e20c4b652c96fa7e9c14", [:rebar3], [], "hexpm", "b02efa785caececf9702c681c80a9ca12a39f9161a846ce17b01fb20aeeed7eb"},
"pgo": {:hex, :pgo, "0.14.0", "f53711d103d7565db6fc6061fcf4ff1007ab39892439be1bb02d9f686d7e6663", [:rebar3], [{:backoff, "~> 1.1.6", [hex: :backoff, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:pg_types, "~> 0.4.0", [hex: :pg_types, repo: "hexpm", optional: false]}], "hexpm", "71016c22599936e042dc0012ee4589d24c71427d266292f775ebf201d97df9c9"},
"phoenix": {:hex, :phoenix, "1.7.18", "5310c21443514be44ed93c422e15870aef254cf1b3619e4f91538e7529d2b2e4", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "1797fcc82108442a66f2c77a643a62980f342bfeb63d6c9a515ab8294870004e"},
Expand Down
126 changes: 126 additions & 0 deletions test/supavisor/monitoring/prom_ex_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
defmodule Supavisor.Monitoring.PromExTest do
use Supavisor.DataCase, async: true
use ExUnitProperties

@subject Supavisor.Monitoring.PromEx

@tenant "prom_tenant"

setup do
db_conf = Application.get_env(:supavisor, Repo)

{:ok, proxy} =
Postgrex.start_link(
hostname: db_conf[:hostname],
port: Application.get_env(:supavisor, :proxy_port_transaction),
database: db_conf[:database],
password: db_conf[:password],
username: db_conf[:username] <> "." <> @tenant,
socket_dir: nil,
show_sensitive_data_on_connection_error: true
)

assert :idle == DBConnection.status(proxy)

%{proxy: proxy, user: db_conf[:username], db_name: db_conf[:database]}
end

describe "get_metrics/1" do
@sources %{
{:darwin, :aarch64} => {
"https://github.com/prometheus/prom2json/releases/download/v1.4.1/prom2json-1.4.1.darwin-arm64.tar.gz",
"prom2json-1.4.1.darwin-arm64/prom2json"
}
}
setup do
{:ok, prom2json: Supavisor.Downloader.ensure("prom2json", @sources)}
end

@tag :tmp_dir
test "returned metrics are parseable", %{tmp_dir: dir, prom2json: exe} do

Check failure on line 40 in test/supavisor/monitoring/prom_ex_test.exs

View workflow job for this annotation

GitHub Actions / Run tests

test get_metrics/1 returned metrics are parseable (Supavisor.Monitoring.PromExTest)
metrics = @subject.get_metrics()
file = Path.join(dir, "prom.out")
File.write!(file, metrics)

assert {_, 0} = System.cmd(exe, [file])
end

@tag :tmp_dir
property "non-standard DB names do not cause parsing issues", %{tmp_dir: dir, prom2json: exe} do

Check failure on line 49 in test/supavisor/monitoring/prom_ex_test.exs

View workflow job for this annotation

GitHub Actions / Run tests

property get_metrics/1 non-standard DB names do not cause parsing issues (Supavisor.Monitoring.PromExTest)
tenant = "tenant"
user = "user"

check all db_name <- string(:printable, min_length: 1, max_length: 63) do
Supavisor.Monitoring.Telem.client_join(

Check warning on line 54 in test/supavisor/monitoring/prom_ex_test.exs

View workflow job for this annotation

GitHub Actions / Code style

Nested modules could be aliased at the top of the invoking module.
:ok,
{{:single, tenant}, user, :session, db_name, nil}
)

metrics = @subject.get_metrics()
file = Path.join(dir, "prom.out")
File.write!(file, metrics)

assert {out, 0} = System.cmd(exe, [file])
assert {:ok, measurements} = Jason.decode(out)

assert %{"metrics" => metrics} =
Enum.find(measurements, &(&1["name"] == "supavisor_client_joins_ok"))

assert Enum.find(metrics, &(&1["labels"]["db_name"] == db_name))
end
end

@tag :tmp_dir
property "non-standard user names do not cause parsing issues", %{

Check failure on line 74 in test/supavisor/monitoring/prom_ex_test.exs

View workflow job for this annotation

GitHub Actions / Run tests

property get_metrics/1 non-standard user names do not cause parsing issues (Supavisor.Monitoring.PromExTest)
tmp_dir: dir,
prom2json: exe
} do
tenant = "tenant"
db_name = "db_name"

check all user <- string(:printable, min_length: 1, max_length: 63) do
Supavisor.Monitoring.Telem.client_join(

Check warning on line 82 in test/supavisor/monitoring/prom_ex_test.exs

View workflow job for this annotation

GitHub Actions / Code style

Nested modules could be aliased at the top of the invoking module.
:ok,
{{:single, tenant}, user, :session, db_name, nil}
)

metrics = @subject.get_metrics()
file = Path.join(dir, "prom.out")
File.write!(file, metrics)

assert {out, 0} = System.cmd(exe, [file])
assert {:ok, measurements} = Jason.decode(out)

assert %{"metrics" => metrics} =
Enum.find(measurements, &(&1["name"] == "supavisor_client_joins_ok"))

assert Enum.find(metrics, &(&1["labels"]["db_name"] == db_name))
end
end

@tag :tmp_dir
property "non-standard tenant names do not cause parsing issues", %{tmp_dir: dir} do

Check failure on line 102 in test/supavisor/monitoring/prom_ex_test.exs

View workflow job for this annotation

GitHub Actions / Run tests

property get_metrics/1 non-standard tenant names do not cause parsing issues (Supavisor.Monitoring.PromExTest)
db_name = "db_name"
user = "user"

check all tenant <- string(:printable, min_length: 1) do
Supavisor.Monitoring.Telem.client_join(

Check warning on line 107 in test/supavisor/monitoring/prom_ex_test.exs

View workflow job for this annotation

GitHub Actions / Code style

Nested modules could be aliased at the top of the invoking module.
:ok,
{{:single, tenant}, user, :session, db_name, nil}
)

metrics = @subject.get_metrics()
file = Path.join(dir, "prom.out")
File.write!(file, metrics)

assert {out, 0} = System.cmd("prom2json", [file])
assert {:ok, measurements} = Jason.decode(out)

assert %{"metrics" => metrics} =
Enum.find(measurements, &(&1["name"] == "supavisor_client_joins_ok"))

assert Enum.find(metrics, &(&1["labels"]["db_name"] == db_name))
end
end
end
end
38 changes: 0 additions & 38 deletions test/supavisor/prom_ex_test.exs

This file was deleted.

53 changes: 53 additions & 0 deletions test/support/downloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule Supavisor.Downloader do

Check warning on line 1 in test/support/downloader.ex

View workflow job for this annotation

GitHub Actions / Code style

Modules should have a @moduledoc tag.
def ensure(name, sources) do
case System.find_executable(name) do
nil -> do_download(name, sources)
path -> path
end
end

def do_download(name, sources) do
path = System.tmp_dir!()
arch = os_arch()

source =
case Map.fetch(sources, arch) do
{:ok, src} -> src
:error -> raise "Cannot find source for #{inspect(arch)}"
end

{url, file} =
case source do
{url, file} -> {url, file}
url when is_binary(url) -> {url, name}
end

out = Path.join(path, file)

if not File.exists?(out) do
%Req.Response{status: 200, body: body} = Req.get!(url)

:ok =
:erl_tar.extract({:binary, body}, [
:compressed,
cwd: to_charlist(path),
files: [to_charlist(file)]
])
end

out
end

defp os_arch do
{_, name} = :os.type()

arch =
:erlang.system_info(:system_architecture)
|> List.to_string()
|> String.split("-", parts: 2)
|> hd()
|> String.to_atom()

{name, arch}
end
end

0 comments on commit 2e5f303

Please sign in to comment.