Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ex/feat: include :on_prepare_payload option #28

Merged
merged 1 commit into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion ex/lib/logflare_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,22 @@ defmodule LogflareEx do
def send_events(%Client{source_token: nil, source_name: nil}, _batch), do: {:error, :no_source}

def send_events(client, [%{} | _] = batch) do
body = Bertex.encode(%{"batch" => batch, "source" => client.source_token})
on_prepare_payload = Map.get(client, :on_prepare_payload)

prepared_batch =
if on_prepare_payload do
Enum.map(batch, fn event ->
case on_prepare_payload do
{m, f, 1} -> apply(m, f, [event])
cb when is_function(cb) -> cb.(event)
_ -> event
end
end)
else
batch
end

body = Bertex.encode(%{"batch" => prepared_batch, "source" => client.source_token})

case Tesla.post(client.tesla_client, "/api/logs", body) do
{:ok, %Tesla.Env{status: status, body: body}} when status < 300 ->
Expand Down
3 changes: 3 additions & 0 deletions ex/lib/logflare_ex/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ defmodule LogflareEx.Client do
- `:source_token`: Source UUID. Mutually exclusive with `:source_name`
- `:source_name`: Source name. Mutually exclusive with `:source_token`
- `:on_error`: mfa callback for handling API errors. Must be 1 arity.
- `:on_prepare_payload`: mfa callback or anonymous function for preparing the final payload before sending to API. Must be 1 arity.
- `:auto_flush`: Used for batching. Enables automatic flushing. If disabled, `LogflareEx.flush/1` must be called.
- `:flush_interval`: Used for batching. Flushes cached events at the provided interval.
- `:batch_size`: Used for batching. It is the maximum number of events send per API request.
Expand All @@ -61,6 +62,7 @@ defmodule LogflareEx.Client do
field(:source_token, String.t())
field(:source_name, String.t())
field(:on_error, list() | mfa(), default: nil)
field(:on_prepare_payload, list() | mfa(), default: nil)
# batching
field(:auto_flush, :boolean, default: true)
field(:flush_interval, non_neg_integer(), default: @default_flush_interval)
Expand All @@ -79,6 +81,7 @@ defmodule LogflareEx.Client do
source_name: get_config_value(:source_name),
tesla_client: nil,
on_error: get_config_value(:on_error),
on_prepare_payload: get_config_value(:on_prepare_payload),
flush_interval: get_config_value(:flush_interval) || @default_flush_interval,
batch_size: get_config_value(:batch_size) || @default_batch_size
})
Expand Down
6 changes: 4 additions & 2 deletions ex/lib/logflare_ex/telemetry_reporter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ defmodule LogflareEx.TelemetryReporter do
def handle_attach(event, measurements, metadata, config) when is_list(config) do
# merge configuration
config_file_opts = (Application.get_env(:logflare_ex, __MODULE__) || []) |> Map.new()
opts = Enum.into(config, config_file_opts)

opts =
Enum.into(config, config_file_opts)

payload = %{metadata: metadata, measurements: measurements}
to_include = Map.get(opts, :include, [])

filtered_payload =
for path <- to_include,
String.starts_with?(path, "measurements.") or String.starts_with?(path, "metadata."),
String.starts_with?(path, "measurements") or String.starts_with?(path, "metadata"),
reduce: %{} do
acc -> put_path(acc, path, get_path(payload, path))
end
Expand Down
32 changes: 32 additions & 0 deletions ex/test/logflare_ex_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ defmodule LogflareExTest do
end
end

describe "on_prepare_payload" do
test "triggered before payload is sent" do
pid = self()

Tesla
|> expect(:post, 3, fn _client, _path, body ->
%{"batch" => [event]} = Bertex.decode(body)
send(pid, {event.ref, event})
{:ok, %Tesla.Env{status: 500, body: "some server error"}}
end)

LogflareEx.TestUtils
|> expect(:stub_function, 2, fn data ->
%{different: "value", ref: data.ref}
end)

for cb <- [
{LogflareEx.TestUtils, :stub_function, 1},
&LogflareEx.TestUtils.stub_function/1,
fn data -> %{different: "value", ref: data.ref} end
] do
client = LogflareEx.client(api_key: "123", source_token: "123", on_prepare_payload: cb)
ref = make_ref()

assert {:error, %Tesla.Env{}} =
LogflareEx.send_events(client, [%{some: "event", ref: ref}])

assert_receive {^ref, %{different: "value", ref: _}}
end
end
end

describe "batching" do
setup do
pid = start_supervised!(BatcherSup)
Expand Down
36 changes: 36 additions & 0 deletions ex/test/telemetry_reporter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,42 @@ defmodule LogflareEx.TelemetryReporterTest do
assert event[:measurements][:latency] == [123, 223]
refute event[:measurements][:other]
end

test "handle_attach/4 with :on_prepare_payload with anonymous function" do
pid = self()
ref = make_ref()

Tesla
|> expect(:post, fn _client, _path, body ->
decoded = Bertex.decode(body)
send(pid, {ref, decoded})
{:ok, %Tesla.Env{status: 201, body: Jason.encode!(%{"message" => "server msg"})}}
end)

:telemetry.attach("my-id", [:some, :event], &TelemetryReporter.handle_attach/4,
auto_flush: true,
flush_interval: 50,
include: ["measurements"],
on_prepare_payload: fn payload ->
payload
|> Map.put(:message, "hello!")
|> Map.put(:test, payload.measurements.other)
end
)

:telemetry.execute([:some, :event], %{latency: [123, 223], other: "value"}, %{
some: "metadata",
to_exclude: "this field"
})

Process.sleep(300)

assert_received {^ref, %{"batch" => [event]}}
# other fields will be included
assert event[:message] == "hello!"
assert event[:test] == "value"
assert event[:measurements][:latency] == [123, 223]
end
end

# reporter
Expand Down
Loading