Skip to content

Commit

Permalink
Add unique job feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ananthakumaran committed Jul 23, 2022
1 parent 7e16e5f commit e365be1
Show file tree
Hide file tree
Showing 18 changed files with 529 additions and 58 deletions.
61 changes: 59 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,7 @@ To unsubscribe from all queues:
If you'd like to customize worker execution and/or create plugins like Sidekiq/Resque have, Exq supports custom middleware. The first step would be to define the middleware in ```config.exs``` and add your middleware into the chain:

```elixir
middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager,
Exq.Middleware.Logger]
middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, Exq.Middleware.Unique, Exq.Middleware.Logger]
```

You can then create a module that implements the middleware behavior and defines `before_work`, `after_processed_work` and `after_failed_work` functions. You can also halt execution of the chain as well. For a simple example of middleware implementation, see the [Exq Logger Middleware](https://github.com/akira/exq/blob/master/lib/exq/middleware/logger.ex).
Expand Down Expand Up @@ -468,6 +467,64 @@ config :exq,
missed_heartbeats_allowed: 5
```

## Unique Jobs

There are many use cases where we want to avoid duplicate jobs. Exq
provides a few job level options to handle these cases.

This feature is implemented using lock abstraction. When you enqueue a
job for the first time, a unique lock is created. The lock token is
derived from the job queue, class and args or from the `unique_token`
value if provided. If you try to enqueue another job with same args
and the lock has not expired yet, you will get back `{:conflict,
jid}`, here jid refers the first successful job.

The lock expiration is controlled by two options.

* `unique_for` (seconds), controls the maximum duration a lock can be
active. This option is mandatory to create a unique job and the lock
never outlives the expiration duration. In cases of scheduled job, the
expiration time is calculated as `scheduled_time + unique_for`

* `unique_until` allows you to clear the lock based on job
lifecycle. Using `:success` will clear the lock on successful
completion of job, `:start` will clear the lock when the job is picked
for execution for the first time. `:expiry` specifies the lock should
be cleared based on the expiration time set via
`unique_for`.

```elixir
{:ok, jid} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"], unique_for: 60 * 60)
{:conflict, ^jid} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"], unique_for: 60 * 60)
```

### Example usages

* Idempotency - Let's say you want to send a welcome email and want to
make sure it's never sent more than once, even when the enqueue part
might get retried due to timeout etc. Use a reasonable expiration
duration (unique_for) that covers the retry period along with
`unique_until: :expiry`.

* Debounce - Let's say for any change to user data, you want to sync
it to another system. If you just enqueue a job for each change, you
might end up with unnecessary duplicate sync calls. Use
`unique_until: :start` along with expiration time based on queue
load. This will make sure you never have more than one job pending
for a user in the queue.

* Batch - Let's you want to send a notification to user, but want to
wait for an hour and batch them together. Schedule a job one hour in
the future using `enqueue_in` and set `unique_until: :success`. This
will make sure no other job get enqueued till the scheduled job
completes successfully.

Although Exq provides unique jobs feature, try to make your worker
idempotent as much as possible. Unique jobs doesn't prevent your job
from getting retried on failure etc. So, unique jobs is **best
effort**, not a guarantee to avoid duplicate execution.


## Web UI

Exq has a separate repo, exq_ui which provides with a Web UI to monitor your workers:
Expand Down
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ config :exq,
Exq.Middleware.Stats,
Exq.Middleware.Job,
Exq.Middleware.Manager,
Exq.Middleware.Unique,
Exq.Middleware.Logger
]

Expand Down
1 change: 1 addition & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ config :exq,
Exq.Middleware.Stats,
Exq.Middleware.Job,
Exq.Middleware.Manager,
Exq.Middleware.Unique,
Exq.Middleware.Telemetry
],
queue_adapter: Exq.Adapters.Queue.Mock
18 changes: 15 additions & 3 deletions lib/exq/enqueue_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ defmodule Exq.Enqueuer.EnqueueApi do
quote location: :keep do
alias Exq.Support.Config

@options_doc """
* `options`: Following job options are supported
* `max_retries` (integer) - max retry count
* `jid` (string) - user supplied jid value
* `unique_for` (integer) - lock expiration duration in seconds
* `unique_token` (string) - unique lock token. By default the token is computed based on the queue, class and args.
* `unique_until` (atom) - defaults to `:success`. Supported values are
* `:success` - unlock on job success
* `:start` - unlock on job first execution
* `:expiry` - unlock when the lock is expired. Depends on `unique_for` value.
"""

@default_options []
@doc """
Enqueue a job immediately.
Expand All @@ -19,7 +31,7 @@ defmodule Exq.Enqueuer.EnqueueApi do
* `queue` - Name of queue to use
* `worker` - Worker module to target
* `args` - Array of args to send to worker
* `options` - job options, for example [max_retries: `Integer`, jid: `String`]
#{@options_doc}
Returns:
* `{:ok, jid}` if the job was enqueued successfully, with `jid` = Job ID.
Expand All @@ -43,7 +55,7 @@ defmodule Exq.Enqueuer.EnqueueApi do
* `time` - Time to enqueue
* `worker` - Worker module to target
* `args` - Array of args to send to worker
* `options` - job options, for example [max_retries: `Integer`, jid: `String`]
#{@options_doc}
If Exq is running in `mode: [:enqueuer]`, then you will need to use the Enqueuer
to schedule jobs, for example:
Expand All @@ -70,7 +82,7 @@ defmodule Exq.Enqueuer.EnqueueApi do
* `offset` - Offset in seconds in the future to enqueue
* `worker` - Worker module to target
* `args` - Array of args to send to worker
* `options` - job options, for example [max_retries: `Integer`]
#{@options_doc}
If Exq is running in `mode: [:enqueuer]`, then you will need to use the Enqueuer
to schedule jobs, for example:
Expand Down
1 change: 1 addition & 0 deletions lib/exq/middleware/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Exq.Middleware.Pipeline do
- Exq.Middleware.Job: Will NOT remove the backup from job queue
- Exq.Middleware.Logger: Will NOT record job as done or failed with timestamp
- Exq.Middleware.Manager: Will NOT update worker counter
- Exq.Middleware.Unique: Will NOT clear unique lock
- Exq.Middleware.Stats: Will NOT remove job from processes queue
"""
Expand Down
44 changes: 44 additions & 0 deletions lib/exq/middleware/unique.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule Exq.Middleware.Unique do
@behaviour Exq.Middleware.Behaviour
alias Exq.Redis.JobQueue
alias Exq.Middleware.Pipeline

def before_work(
%Pipeline{assigns: %{job_serialized: job_serialized, redis: redis, namespace: namespace}} =
pipeline
) do
job = Exq.Support.Job.decode(job_serialized)

case job do
%{unique_until: "start", unique_token: unique_token, retry_count: retry_count}
when retry_count in [0, nil] ->
{:ok, _} = JobQueue.unlock(redis, namespace, unique_token)

_ ->
:ok
end

pipeline
end

def after_processed_work(
%Pipeline{assigns: %{job_serialized: job_serialized, redis: redis, namespace: namespace}} =
pipeline
) do
job = Exq.Support.Job.decode(job_serialized)

case job do
%{unique_until: "success", unique_token: unique_token} ->
{:ok, _} = JobQueue.unlock(redis, namespace, unique_token)

_ ->
:ok
end

pipeline
end

def after_failed_work(pipeline) do
pipeline
end
end
4 changes: 2 additions & 2 deletions lib/exq/redis/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ defmodule Exq.Redis.Connection do
q(redis, ["SET", key, val])
end

def del!(redis, key) do
q(redis, ["DEL", key])
def del!(redis, key, options \\ []) do
q(redis, ["DEL", key], options)
end

def expire!(redis, key, time \\ 10) do
Expand Down
Loading

0 comments on commit e365be1

Please sign in to comment.