Skip to content

Commit

Permalink
Merge pull request #414 from hez/feature/telemetry-middleware
Browse files Browse the repository at this point in the history
middleware to fire telemetry events
  • Loading branch information
ananthakumaran authored Jul 31, 2020
2 parents 9cfcd5c + 0fc7e6f commit 3fcf234
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 4 deletions.
7 changes: 6 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@ config :exq,
max_retries: 0,
stats_flush_interval: 5,
stats_batch_size: 1,
middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager],
middleware: [
Exq.Middleware.Stats,
Exq.Middleware.Job,
Exq.Middleware.Manager,
Exq.Middleware.Telemetry
],
queue_adapter: Exq.Adapters.Queue.Mock
143 changes: 143 additions & 0 deletions lib/exq/middleware/telemetry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
defmodule Exq.Middleware.Telemetry do
@moduledoc """
This middleware allows you to subscribe to the telemetry events and
collect metrics about your jobs.
### Exq telemetry events
The middleware emit three events, same as what `:telemetry.span/3` emits.
- `[:exq, :job, :start]` - Is invoked whenever a job starts.
** Measurements **
- `system_time` (integer) - System time when the job started
- `[:exq, :job, :stop]` - Is invoked whenever a job completes successfully.
** Measurements **
- `duration` (integer) - Duration of the job execution in native unit
- `[:exq, :job, :exception]` - Is invoked whenever a job fails.
** Measurements **
- `duration` (integer) - Duration of the job execution in native unit
** Metadata **
In addition to the common metadata, exception event will have the following fields.
- `kind` (exit | error) - either `exit` or `error`
- `reason` (term) - could be an `Exception.t/0` or term
- `stacktrace` (list) - Stacktrace of the error. Will be empty if the kind is `exit`.
** Metadata **
Each event has the following common metadata
- `enqueued_at` (`DateTime.t/0`) - datetime the job was enqueued
- `queue` (`String.t/0`) - the name of the queue the job was executed in
- `class` (`String.t/0`) - the job's class
- `jid` (`String.t/0`) - the job's jid
- `retry_count` (integer) - number of times this job has failed so far
### Example:
```
defmodule MyApp.Application do
def start(_type, _args) do
children = [
# .....
{Telemetry.Metrics.ConsoleReporter, metrics: metrics()}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
defp metrics do
[
counter("exq.job.stop.duration"),
counter("exq.job.exception.duration"),
distribution("exq.job.stop.duration",
buckets: [0.1, 0.2, 0.3, 0.5, 0.75, 1, 2, 3, 5, 10],
unit: {:native, :millisecond}
),
distribution("exq.job.exception.duration",
buckets: [0.1, 0.2, 0.3, 0.5, 0.75, 1, 2, 3, 5, 10],
unit: {:native, :millisecond}
),
summary("exq.job.stop.duration", unit: {:native, :millisecond}),
summary("exq.job.exception.duration", unit: {:native, :millisecond})
]
end
end
```
"""

@behaviour Exq.Middleware.Behaviour
alias Exq.Middleware.Pipeline
import Pipeline

defguardp is_stacktrace(stacktrace)
when is_list(stacktrace) and length(stacktrace) > 0 and is_tuple(hd(stacktrace)) and
(tuple_size(hd(stacktrace)) == 3 or tuple_size(hd(stacktrace)) == 4)

@impl true
def after_failed_work(pipeline) do
duration = System.monotonic_time() - pipeline.assigns.telemetry_start_time

error_map =
case pipeline.assigns.error do
{reason, stacktrace} when is_stacktrace(stacktrace) ->
%{kind: :error, reason: reason, stacktrace: stacktrace}

reason ->
%{kind: :exit, reason: reason, stacktrace: []}
end

:telemetry.execute(
[:exq, :job, :exception],
%{duration: duration},
Map.merge(metadata(pipeline.assigns.job), error_map)
)

pipeline
end

@impl true
def after_processed_work(pipeline) do
duration = System.monotonic_time() - pipeline.assigns.telemetry_start_time

:telemetry.execute(
[:exq, :job, :stop],
%{duration: duration},
metadata(pipeline.assigns.job)
)

pipeline
end

@impl true
def before_work(pipeline) do
:telemetry.execute(
[:exq, :job, :start],
%{system_time: System.system_time()},
metadata(pipeline.assigns.job)
)

assign(pipeline, :telemetry_start_time, System.monotonic_time())
end

defp metadata(job),
do: %{
enqueued_at: DateTime.from_unix!(round(job.enqueued_at * 1000), :millisecond),
queue: job.queue,
class: job.class,
jid: job.jid,
retry_count: job.retry_count || 0
}
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
"ranch": {:hex, :ranch, "1.7.0", "9583f47160ca62af7f8d5db11454068eaa32b56eeadf984d4f46e61a076df5f2", [:rebar3], [], "hexpm", "59f7501c3a56125b2fc5684c3048fac9d043c0bf4d173941b12ca927949af189"},
"redix": {:hex, :redix, "0.10.2", "a9eabf47898aa878650df36194aeb63966d74f5bd69d9caa37babb32dbb93c5d", [:mix], [{:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "251b329893b8f6bb115fc0e30df7f12ee641b1e4547d760cf0909416a209f9bd"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm", "4f8805eb5c8a939cf2359367cb651a3180b27dfb48444846be2613d79355d65e"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm", "e9e3cacfd37c1531c0ca70ca7c0c30ce2dbb02998a4f7719de180fe63f8d41e4"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
}
3 changes: 2 additions & 1 deletion test/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ defmodule Exq.ConfigTest do
assert default_middleware == [
Exq.Middleware.Stats,
Exq.Middleware.Job,
Exq.Middleware.Manager
Exq.Middleware.Manager,
Exq.Middleware.Telemetry
]

assert mode == :default
Expand Down
9 changes: 8 additions & 1 deletion test/middleware_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,14 @@ defmodule MiddlewareTest do

test "restores default middleware after process kill" do
{:ok, _pid} = Exq.start_link()
chain = [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager]

chain = [
Exq.Middleware.Stats,
Exq.Middleware.Job,
Exq.Middleware.Manager,
Exq.Middleware.Telemetry
]

assert Middleware.all(Middleware) == chain

pid = Process.whereis(Middleware)
Expand Down

0 comments on commit 3fcf234

Please sign in to comment.