Skip to content

Commit

Permalink
follow telemetry:span/3 spec
Browse files Browse the repository at this point in the history
  • Loading branch information
ananthakumaran committed Jul 19, 2020
1 parent 9e7feaf commit 0fc7e6f
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 51 deletions.
7 changes: 6 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@ config :exq,
max_retries: 0,
stats_flush_interval: 5,
stats_batch_size: 1,
middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager],
middleware: [
Exq.Middleware.Stats,
Exq.Middleware.Job,
Exq.Middleware.Manager,
Exq.Middleware.Telemetry
],
queue_adapter: Exq.Adapters.Queue.Mock
136 changes: 89 additions & 47 deletions lib/exq/middleware/telemetry.ex
Original file line number Diff line number Diff line change
@@ -1,27 +1,56 @@
defmodule Exq.Middleware.Telemetry do
@doc """
This allows you to subscribe to the telemetry events and collect metrics about your jobs.
@moduledoc """
This middleware allows you to subscribe to the telemetry events and
collect metrics about your jobs.
## Ecto telemetry events
### Exq telemetry events
The following events are emitted:
The middleware emit three events, same as what `:telemetry.span/3` emits.
- `[:exq, :job, :start]`: Is invoked whenever a job starts. The measurement is a duration in seconds the job ran for.
- `[:exq, :job, :stop]`: Is invoked whenever a job successful completes. The measurement is a duration in seconds the job ran for and a retry_count for the number of times the job has been retried.
- `[:exq, :job, :exception]`: Is invoked whenever a job fails. The measurement is a duration in seconds the job ran for and a retry_count for the number of times the job has been retried.
- `[:exq, :job, :start]` - Is invoked whenever a job starts.
** Measurements **
- `system_time` (integer) - System time when the job started
- `[:exq, :job, :stop]` - Is invoked whenever a job completes successfully.
** Measurements **
- `duration` (integer) - Duration of the job execution in native unit
- `[:exq, :job, :exception]` - Is invoked whenever a job fails.
** Measurements **
- `duration` (integer) - Duration of the job execution in native unit
** Metadata **
In addition to the common metadata, exception event will have the following fields.
- `kind` (exit | error) - either `exit` or `error`
- `reason` (term) - could be an `Exception.t/0` or term
- `stacktrace` (list) - Stacktrace of the error. Will be empty if the kind is `exit`.
** Metadata **
Each event has the following common metadata
- `enqueued_at` (`DateTime.t/0`) - datetime the job was enqueued
- `queue` (`String.t/0`) - the name of the queue the job was executed in
- `class` (`String.t/0`) - the job's class
- `jid` (`String.t/0`) - the job's jid
- `retry_count` (integer) - number of times this job has failed so far
Each event has the following metadata
- `enuqueued_at` datetime the job was enqueued
- `queue` the name of the queue the job was executed in
- `class` the job's class
### Example:
```
defmodule MyApp.Application do
def start(_type, _args) do
children = [
# .....
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000},
{Telemetry.Metrics.ConsoleReporter, metrics: metrics()}
]
Expand All @@ -31,71 +60,84 @@ defmodule Exq.Middleware.Telemetry do
defp metrics do
[
counter("exq.job.started.count"),
counter("exq.job.started.retry_count"),
counter("exq.job.processed.retry_count"),
distribution("exq.job.processed.duration",
buckets: [0.1, 0.2, 0.3, 0.5, 0.75, 1, 2, 3, 5, 10]
counter("exq.job.stop.duration"),
counter("exq.job.exception.duration"),
distribution("exq.job.stop.duration",
buckets: [0.1, 0.2, 0.3, 0.5, 0.75, 1, 2, 3, 5, 10],
unit: {:native, :millisecond}
),
counter("exq.job.failed.retry_count"),
distribution("exq.job.failed.duration",
buckets: [0.1, 0.2, 0.3, 0.5, 0.75, 1, 2, 3, 5, 10]
)
distribution("exq.job.exception.duration",
buckets: [0.1, 0.2, 0.3, 0.5, 0.75, 1, 2, 3, 5, 10],
unit: {:native, :millisecond}
),
summary("exq.job.stop.duration", unit: {:native, :millisecond}),
summary("exq.job.exception.duration", unit: {:native, :millisecond})
]
end
end
```
"""

@behaviour Exq.Middleware.Behaviour
alias Exq.Middleware.Pipeline
import Pipeline

import Exq.Middleware.Pipeline
defguardp is_stacktrace(stacktrace)
when is_list(stacktrace) and length(stacktrace) > 0 and is_tuple(hd(stacktrace)) and
(tuple_size(hd(stacktrace)) == 3 or tuple_size(hd(stacktrace)) == 4)

@impl true
def after_failed_work(pipeline) do
job = pipeline.assigns.job
duration = System.monotonic_time() - pipeline.assigns.telemetry_start_time

error_map =
case pipeline.assigns.error do
{reason, stacktrace} when is_stacktrace(stacktrace) ->
%{kind: :error, reason: reason, stacktrace: stacktrace}

reason ->
%{kind: :exit, reason: reason, stacktrace: []}
end

:telemetry.execute(
[:exq, :job, :failed],
%{
duration: delta(pipeline),
retry_count: job.retry_count || 1
},
tags(job)
[:exq, :job, :exception],
%{duration: duration},
Map.merge(metadata(pipeline.assigns.job), error_map)
)

pipeline
end

@impl true
def after_processed_work(pipeline) do
job = pipeline.assigns.job
duration = System.monotonic_time() - pipeline.assigns.telemetry_start_time

:telemetry.execute(
[:exq, :job, :processed],
%{
duration: delta(pipeline),
retry_count: job.retry_count || 0
},
tags(job)
[:exq, :job, :stop],
%{duration: duration},
metadata(pipeline.assigns.job)
)

pipeline
end

@impl true
def before_work(pipeline) do
job = pipeline.assigns.job

:telemetry.execute(
[:exq, :job, :started],
%{count: 1, retry_count: job.retry_count || 0},
tags(job)
[:exq, :job, :start],
%{system_time: System.system_time()},
metadata(pipeline.assigns.job)
)

assign(pipeline, :started_at, DateTime.utc_now())
assign(pipeline, :telemetry_start_time, System.monotonic_time())
end

defp tags(job),
do: %{enqueued_at: unix_to_datetime(job.enqueued_at), queue: job.queue, class: job.class}

defp delta(%Pipeline{assigns: assigns}),
do: DateTime.diff(DateTime.utc_now(), assigns.started_at, :second)
defp metadata(job),
do: %{
enqueued_at: DateTime.from_unix!(round(job.enqueued_at * 1000), :millisecond),
queue: job.queue,
class: job.class,
jid: job.jid,
retry_count: job.retry_count || 0
}
end
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
"ranch": {:hex, :ranch, "1.7.0", "9583f47160ca62af7f8d5db11454068eaa32b56eeadf984d4f46e61a076df5f2", [:rebar3], [], "hexpm", "59f7501c3a56125b2fc5684c3048fac9d043c0bf4d173941b12ca927949af189"},
"redix": {:hex, :redix, "0.10.2", "a9eabf47898aa878650df36194aeb63966d74f5bd69d9caa37babb32dbb93c5d", [:mix], [{:telemetry, "~> 0.4.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "251b329893b8f6bb115fc0e30df7f12ee641b1e4547d760cf0909416a209f9bd"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], [], "hexpm", "4f8805eb5c8a939cf2359367cb651a3180b27dfb48444846be2613d79355d65e"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm", "e9e3cacfd37c1531c0ca70ca7c0c30ce2dbb02998a4f7719de180fe63f8d41e4"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
}
3 changes: 2 additions & 1 deletion test/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ defmodule Exq.ConfigTest do
assert default_middleware == [
Exq.Middleware.Stats,
Exq.Middleware.Job,
Exq.Middleware.Manager
Exq.Middleware.Manager,
Exq.Middleware.Telemetry
]

assert mode == :default
Expand Down
9 changes: 8 additions & 1 deletion test/middleware_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,14 @@ defmodule MiddlewareTest do

test "restores default middleware after process kill" do
{:ok, _pid} = Exq.start_link()
chain = [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager]

chain = [
Exq.Middleware.Stats,
Exq.Middleware.Job,
Exq.Middleware.Manager,
Exq.Middleware.Telemetry
]

assert Middleware.all(Middleware) == chain

pid = Process.whereis(Middleware)
Expand Down

0 comments on commit 0fc7e6f

Please sign in to comment.