diff --git a/README.md b/README.md index a294626a..1c9317e6 100644 --- a/README.md +++ b/README.md @@ -372,8 +372,7 @@ To unsubscribe from all queues: If you'd like to customize worker execution and/or create plugins like Sidekiq/Resque have, Exq supports custom middleware. The first step would be to define the middleware in ```config.exs``` and add your middleware into the chain: ```elixir -middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, - Exq.Middleware.Logger] +middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, Exq.Middleware.Unique, Exq.Middleware.Logger] ``` You can then create a module that implements the middleware behavior and defines `before_work`, `after_processed_work` and `after_failed_work` functions. You can also halt execution of the chain as well. For a simple example of middleware implementation, see the [Exq Logger Middleware](https://github.com/akira/exq/blob/master/lib/exq/middleware/logger.ex). @@ -468,6 +467,64 @@ config :exq, missed_heartbeats_allowed: 5 ``` +## Unique Jobs + +There are many use cases where we want to avoid duplicate jobs. Exq +provides a few job level options to handle these cases. + +This feature is implemented using lock abstraction. When you enqueue a +job for the first time, a unique lock is created. The lock token is +derived from the job queue, class and args or from the `unique_token` +value if provided. If you try to enqueue another job with same args +and the lock has not expired yet, you will get back `{:conflict, +jid}`, here jid refers the first successful job. + +The lock expiration is controlled by two options. + +* `unique_for` (seconds), controls the maximum duration a lock can be +active. This option is mandatory to create a unique job and the lock +never outlives the expiration duration. In cases of scheduled job, the +expiration time is calculated as `scheduled_time + unique_for` + +* `unique_until` allows you to clear the lock based on job +lifecycle. Using `:success` will clear the lock on successful +completion of job, `:start` will clear the lock when the job is picked +for execution for the first time. `:expiry` specifies the lock should +be cleared based on the expiration time set via +`unique_for`. + +```elixir +{:ok, jid} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"], unique_for: 60 * 60) +{:conflict, ^jid} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"], unique_for: 60 * 60) +``` + +### Example usages + +* Idempotency - Let's say you want to send a welcome email and want to + make sure it's never sent more than once, even when the enqueue part + might get retried due to timeout etc. Use a reasonable expiration + duration (unique_for) that covers the retry period along with + `unique_until: :expiry`. + +* Debounce - Let's say for any change to user data, you want to sync + it to another system. If you just enqueue a job for each change, you + might end up with unnecessary duplicate sync calls. Use + `unique_until: :start` along with expiration time based on queue + load. This will make sure you never have more than one job pending + for a user in the queue. + +* Batch - Let's you want to send a notification to user, but want to + wait for an hour and batch them together. Schedule a job one hour in + the future using `enqueue_in` and set `unique_until: :success`. This + will make sure no other job get enqueued till the scheduled job + completes successfully. + +Although Exq provides unique jobs feature, try to make your worker +idempotent as much as possible. Unique jobs doesn't prevent your job +from getting retried on failure etc. So, unique jobs is **best +effort**, not a guarantee to avoid duplicate execution. + + ## Web UI Exq has a separate repo, exq_ui which provides with a Web UI to monitor your workers: diff --git a/config/config.exs b/config/config.exs index d7c95233..60361895 100644 --- a/config/config.exs +++ b/config/config.exs @@ -25,6 +25,7 @@ config :exq, Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, + Exq.Middleware.Unique, Exq.Middleware.Logger ] diff --git a/config/test.exs b/config/test.exs index b70f70b8..b4dd42cb 100644 --- a/config/test.exs +++ b/config/test.exs @@ -26,6 +26,7 @@ config :exq, Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, + Exq.Middleware.Unique, Exq.Middleware.Telemetry ], queue_adapter: Exq.Adapters.Queue.Mock diff --git a/lib/exq/enqueue_api.ex b/lib/exq/enqueue_api.ex index 83055c1d..df4b7724 100644 --- a/lib/exq/enqueue_api.ex +++ b/lib/exq/enqueue_api.ex @@ -10,6 +10,18 @@ defmodule Exq.Enqueuer.EnqueueApi do quote location: :keep do alias Exq.Support.Config + @options_doc """ + * `options`: Following job options are supported + * `max_retries` (integer) - max retry count + * `jid` (string) - user supplied jid value + * `unique_for` (integer) - lock expiration duration in seconds + * `unique_token` (string) - unique lock token. By default the token is computed based on the queue, class and args. + * `unique_until` (atom) - defaults to `:success`. Supported values are + * `:success` - unlock on job success + * `:start` - unlock on job first execution + * `:expiry` - unlock when the lock is expired. Depends on `unique_for` value. + """ + @default_options [] @doc """ Enqueue a job immediately. @@ -19,7 +31,7 @@ defmodule Exq.Enqueuer.EnqueueApi do * `queue` - Name of queue to use * `worker` - Worker module to target * `args` - Array of args to send to worker - * `options` - job options, for example [max_retries: `Integer`, jid: `String`] + #{@options_doc} Returns: * `{:ok, jid}` if the job was enqueued successfully, with `jid` = Job ID. @@ -43,7 +55,7 @@ defmodule Exq.Enqueuer.EnqueueApi do * `time` - Time to enqueue * `worker` - Worker module to target * `args` - Array of args to send to worker - * `options` - job options, for example [max_retries: `Integer`, jid: `String`] + #{@options_doc} If Exq is running in `mode: [:enqueuer]`, then you will need to use the Enqueuer to schedule jobs, for example: @@ -70,7 +82,7 @@ defmodule Exq.Enqueuer.EnqueueApi do * `offset` - Offset in seconds in the future to enqueue * `worker` - Worker module to target * `args` - Array of args to send to worker - * `options` - job options, for example [max_retries: `Integer`] + #{@options_doc} If Exq is running in `mode: [:enqueuer]`, then you will need to use the Enqueuer to schedule jobs, for example: diff --git a/lib/exq/middleware/pipeline.ex b/lib/exq/middleware/pipeline.ex index 6119326e..cc53b151 100644 --- a/lib/exq/middleware/pipeline.ex +++ b/lib/exq/middleware/pipeline.ex @@ -19,6 +19,7 @@ defmodule Exq.Middleware.Pipeline do - Exq.Middleware.Job: Will NOT remove the backup from job queue - Exq.Middleware.Logger: Will NOT record job as done or failed with timestamp - Exq.Middleware.Manager: Will NOT update worker counter + - Exq.Middleware.Unique: Will NOT clear unique lock - Exq.Middleware.Stats: Will NOT remove job from processes queue """ diff --git a/lib/exq/middleware/unique.ex b/lib/exq/middleware/unique.ex new file mode 100644 index 00000000..96f45e83 --- /dev/null +++ b/lib/exq/middleware/unique.ex @@ -0,0 +1,44 @@ +defmodule Exq.Middleware.Unique do + @behaviour Exq.Middleware.Behaviour + alias Exq.Redis.JobQueue + alias Exq.Middleware.Pipeline + + def before_work( + %Pipeline{assigns: %{job_serialized: job_serialized, redis: redis, namespace: namespace}} = + pipeline + ) do + job = Exq.Support.Job.decode(job_serialized) + + case job do + %{unique_until: "start", unique_token: unique_token, retry_count: retry_count} + when retry_count in [0, nil] -> + {:ok, _} = JobQueue.unlock(redis, namespace, unique_token) + + _ -> + :ok + end + + pipeline + end + + def after_processed_work( + %Pipeline{assigns: %{job_serialized: job_serialized, redis: redis, namespace: namespace}} = + pipeline + ) do + job = Exq.Support.Job.decode(job_serialized) + + case job do + %{unique_until: "success", unique_token: unique_token} -> + {:ok, _} = JobQueue.unlock(redis, namespace, unique_token) + + _ -> + :ok + end + + pipeline + end + + def after_failed_work(pipeline) do + pipeline + end +end diff --git a/lib/exq/redis/connection.ex b/lib/exq/redis/connection.ex index 729be077..c7138abe 100644 --- a/lib/exq/redis/connection.ex +++ b/lib/exq/redis/connection.ex @@ -32,8 +32,8 @@ defmodule Exq.Redis.Connection do q(redis, ["SET", key, val]) end - def del!(redis, key) do - q(redis, ["DEL", key]) + def del!(redis, key, options \\ []) do + q(redis, ["DEL", key], options) end def expire!(redis, key, time \\ 10) do diff --git a/lib/exq/redis/job_queue.ex b/lib/exq/redis/job_queue.ex index 5147d740..a4aa926f 100644 --- a/lib/exq/redis/job_queue.ex +++ b/lib/exq/redis/job_queue.ex @@ -23,37 +23,30 @@ defmodule Exq.Redis.JobQueue do @default_size 100 def enqueue(redis, namespace, queue, worker, args, options) do - {jid, job_serialized} = to_job_serialized(queue, worker, args, options) + {jid, job, job_serialized} = to_job_serialized(queue, worker, args, options) - case enqueue(redis, namespace, queue, job_serialized) do + case do_enqueue(redis, namespace, queue, job, job_serialized, unique_check: true) do :ok -> {:ok, jid} other -> other end end - def enqueue(redis, namespace, job_serialized) do - job = Config.serializer().decode_job(job_serialized) - - case enqueue(redis, namespace, job.queue, job_serialized) do - :ok -> {:ok, job.jid} - error -> error - end - end - - def enqueue(redis, namespace, queue, job_serialized) do + defp do_enqueue(redis, namespace, queue, job, job_serialized, options \\ []) do try do + [unlocks_in, unique_key] = unique_args(namespace, job, options) + response = - Connection.qp(redis, [ - ["SADD", full_key(namespace, "queues"), queue], - ["LPUSH", queue_key(namespace, queue), job_serialized] - ]) + Script.eval!( + redis, + :enqueue, + [queue, full_key(namespace, ""), unique_key], + [job_serialized, job.jid, unlocks_in] + ) case response do - {:ok, [%Redix.Error{}, %Redix.Error{}]} = error -> error - {:ok, [%Redix.Error{}, _]} = error -> error - {:ok, [_, %Redix.Error{}]} = error -> error - {:ok, [_, _]} -> :ok - other -> other + {:ok, 0} -> :ok + {:ok, [1, old_jid]} -> {:conflict, old_jid} + error -> error end catch :exit, e -> @@ -62,6 +55,10 @@ defmodule Exq.Redis.JobQueue do end end + def unlock(redis, namespace, unique_token) do + Connection.del!(redis, unique_key(namespace, unique_token), retry_on_connection_error: 3) + end + def enqueue_in(redis, namespace, queue, offset, worker, args, options) when is_integer(offset) do time = Time.offset_from_now(offset) @@ -69,19 +66,48 @@ defmodule Exq.Redis.JobQueue do end def enqueue_at(redis, namespace, queue, time, worker, args, options) do - {jid, job_serialized} = to_job_serialized(queue, worker, args, options) - enqueue_job_at(redis, namespace, job_serialized, jid, time, scheduled_queue_key(namespace)) + {jid, job, job_serialized} = + to_job_serialized(queue, worker, args, options, Time.unix_seconds(time)) + + do_enqueue_job_at( + redis, + namespace, + job, + job_serialized, + jid, + time, + scheduled_queue_key(namespace), + unique_check: true + ) end - def enqueue_job_at(redis, _namespace, job_serialized, jid, time, scheduled_queue) do + def do_enqueue_job_at( + redis, + namespace, + job, + job_serialized, + jid, + time, + scheduled_queue, + options \\ [] + ) do score = Time.time_to_score(time) try do - case Connection.zadd(redis, scheduled_queue, score, job_serialized, - retry_on_connection_error: 3 - ) do - {:ok, _} -> {:ok, jid} - other -> other + [unlocks_in, unique_key] = unique_args(namespace, job, options) + + response = + Script.eval!(redis, :enqueue_at, [scheduled_queue, unique_key], [ + job_serialized, + score, + jid, + unlocks_in + ]) + + case response do + {:ok, 0} -> {:ok, jid} + {:ok, [1, old_jid]} -> {:conflict, old_jid} + error -> error end catch :exit, e -> @@ -204,6 +230,10 @@ defmodule Exq.Redis.JobQueue do full_key(namespace, "queue:#{queue}") end + def unique_key(namespace, unique_token) do + full_key(namespace, "unique:#{unique_token}") + end + def backup_queue_key(namespace, node_id, queue) do full_key(namespace, "queue:backup::#{node_id}::#{queue}") end @@ -258,12 +288,20 @@ defmodule Exq.Redis.JobQueue do Logger.info("Queueing job #{job.jid} to retry in #{offset} seconds") {:ok, _jid} = - enqueue_job_at(redis, namespace, Job.encode(job), job.jid, time, retry_queue_key(namespace)) + do_enqueue_job_at( + redis, + namespace, + job, + Job.encode(job), + job.jid, + time, + retry_queue_key(namespace) + ) end def retry_job(redis, namespace, job) do remove_retry(redis, namespace, job.jid) - {:ok, _jid} = enqueue(redis, namespace, Job.encode(job)) + :ok = do_enqueue(redis, namespace, job.queue, job, Job.encode(job)) end def fail_job(redis, namespace, job, error) do @@ -492,16 +530,18 @@ defmodule Exq.Redis.JobQueue do jid = Keyword.get_lazy(options, :jid, fn -> UUID.uuid4() end) retry = Keyword.get_lazy(options, :max_retries, fn -> get_max_retries() end) - job = %{ - queue: queue, - retry: retry, - class: worker, - args: args, - jid: jid, - enqueued_at: enqueued_at - } + job = + %{ + queue: queue, + retry: retry, + class: worker, + args: args, + jid: jid, + enqueued_at: enqueued_at + } + |> add_unique_attributes(options) - {jid, Config.serializer().encode!(job)} + {jid, job, Config.serializer().encode!(job)} end defp dequeue_scheduled_jobs(redis, namespace, queue_key, raw_jobs) do @@ -547,4 +587,40 @@ defmodule Exq.Redis.JobQueue do Enum.map(list, &Job.decode/1) end end + + defp add_unique_attributes(job, options) do + unique_for = Keyword.get(options, :unique_for, nil) + + if unique_for do + unique_token = + Keyword.get_lazy(options, :unique_token, fn -> + string = + Enum.join([job.queue, job.class] ++ Enum.map(job.args, &:erlang.phash2(&1)), ":") + + :crypto.hash(:sha256, string) |> Base.encode64() + end) + + Map.put(job, :unique_for, unique_for) + |> Map.put(:unique_until, to_string(Keyword.get(options, :unique_until, :success))) + |> Map.put(:unique_token, unique_token) + |> Map.put(:unlocks_at, job.enqueued_at + unique_for) + else + job + end + end + + defp unique_args(namespace, job, options) do + if Keyword.get(options, :unique_check, false) do + case job do + %{unlocks_at: unlocks_at, unique_token: unique_token} -> + unlocks_in = Enum.max([trunc((unlocks_at - Time.unix_seconds()) * 1000), 0]) + [unlocks_in, unique_key(namespace, unique_token)] + + _ -> + [nil, nil] + end + else + [nil, nil] + end + end end diff --git a/lib/exq/redis/script.ex b/lib/exq/redis/script.ex index dd0346c0..3db7a504 100644 --- a/lib/exq/redis/script.ex +++ b/lib/exq/redis/script.ex @@ -12,6 +12,45 @@ defmodule Exq.Redis.Script do end @scripts %{ + enqueue: + Prepare.script(""" + local job_queue, namespace_prefix, unique_key = KEYS[1], KEYS[2], KEYS[3] + local job, jid, unlocks_in = ARGV[1], ARGV[2], tonumber(ARGV[3]) + local unlocked = true + local conflict_jid = nil + + if unlocks_in then + unlocked = redis.call("set", unique_key, jid, "px", unlocks_in, "nx") + end + + if unlocked then + redis.call('SADD', namespace_prefix .. 'queues', job_queue) + redis.call('LPUSH', namespace_prefix .. 'queue:' .. job_queue, job) + return 0 + else + conflict_jid = redis.call("get", unique_key) + return {1, conflict_jid} + end + """), + enqueue_at: + Prepare.script(""" + local schedule_queue, unique_key = KEYS[1], KEYS[2] + local job, score, jid, unlocks_in = ARGV[1], tonumber(ARGV[2]), ARGV[3], tonumber(ARGV[4]) + local unlocked = true + local conflict_jid = nil + + if unlocks_in then + unlocked = redis.call("set", unique_key, jid, "px", unlocks_in, "nx") + end + + if unlocked then + redis.call('ZADD', schedule_queue, score, job) + return 0 + else + conflict_jid = redis.call("get", unique_key) + return {1, conflict_jid} + end + """), scheduler_dequeue_jobs: Prepare.script(""" local schedule_queue, namespace_prefix = KEYS[1], KEYS[2] diff --git a/lib/exq/serializers/json_serializer.ex b/lib/exq/serializers/json_serializer.ex index 67722e79..988fcb60 100644 --- a/lib/exq/serializers/json_serializer.ex +++ b/lib/exq/serializers/json_serializer.ex @@ -38,7 +38,11 @@ defmodule Exq.Serializers.JsonSerializer do processor: Map.get(deserialized, "processor"), queue: Map.get(deserialized, "queue"), retry: Map.get(deserialized, "retry"), - retry_count: Map.get(deserialized, "retry_count") + retry_count: Map.get(deserialized, "retry_count"), + unique_for: Map.get(deserialized, "unique_for"), + unique_until: Map.get(deserialized, "unique_until"), + unique_token: Map.get(deserialized, "unique_token"), + unlocks_at: Map.get(deserialized, "unlocks_at") } end @@ -59,6 +63,18 @@ defmodule Exq.Serializers.JsonSerializer do retry_count: job.retry_count } + deserialized = + if job.unique_for do + Map.merge(deserialized, %{ + unique_for: job.unique_for, + unique_until: job.unique_until, + unique_token: job.unique_token, + unlocks_at: job.unlocks_at + }) + else + deserialized + end + encode!(deserialized) end diff --git a/lib/exq/support/config.ex b/lib/exq/support/config.ex index 745de5e3..8a16b3e9 100644 --- a/lib/exq/support/config.ex +++ b/lib/exq/support/config.ex @@ -33,6 +33,7 @@ defmodule Exq.Support.Config do Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, + Exq.Middleware.Unique, Exq.Middleware.Logger ], queue_adapter: Exq.Adapters.Queue.Redis diff --git a/lib/exq/support/job.ex b/lib/exq/support/job.ex index ad41627c..e3337171 100644 --- a/lib/exq/support/job.ex +++ b/lib/exq/support/job.ex @@ -15,7 +15,11 @@ defmodule Exq.Support.Job do args: nil, jid: nil, finished_at: nil, - enqueued_at: nil + enqueued_at: nil, + unique_for: nil, + unique_until: nil, + unique_token: nil, + unlocks_at: nil alias Exq.Support.Config @@ -39,7 +43,11 @@ defmodule Exq.Support.Job do args: job.args, jid: job.jid, finished_at: job.finished_at, - enqueued_at: job.enqueued_at + enqueued_at: job.enqueued_at, + unique_for: job.unique_for, + unique_until: job.unique_until, + unique_token: job.unique_token, + unlocks_at: job.unlocks_at }) end diff --git a/test/config_test.exs b/test/config_test.exs index 8e8f9c24..7fb0be18 100644 --- a/test/config_test.exs +++ b/test/config_test.exs @@ -282,6 +282,7 @@ defmodule Exq.ConfigTest do Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, + Exq.Middleware.Unique, Exq.Middleware.Telemetry ] diff --git a/test/exq_test.exs b/test/exq_test.exs index ffb6dda3..d0ed740d 100644 --- a/test/exq_test.exs +++ b/test/exq_test.exs @@ -489,6 +489,209 @@ defmodule ExqTest do stop_process(sup) end + test "prevent duplicate job" do + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"]) + {:ok, j1} = Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, [50, :worked], unique_for: 60) + {:conflict, ^j1} = Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, [50, :worked], unique_for: 60) + + :timer.sleep(150) + assert_received {"worked"} + + {:ok, _} = Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, [50, :worked], unique_for: 60) + :timer.sleep(150) + assert_received {"worked"} + stop_process(sup) + end + + test "prevent duplicate with custom token" do + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1", "q2"]) + + {:ok, j1} = + Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, [50, :worked], + unique_for: 60, + unique_token: "t1" + ) + + {:conflict, ^j1} = + Exq.enqueue(Exq, "q2", ExqTest.SleepWorker, [50, :worked], + unique_for: 60, + unique_token: "t1" + ) + + :timer.sleep(150) + assert_received {"worked"} + + {:ok, _} = + Exq.enqueue(Exq, "q2", ExqTest.SleepWorker, [50, :worked], + unique_for: 60, + unique_token: "t1" + ) + + :timer.sleep(150) + assert_received {"worked"} + stop_process(sup) + end + + test "prevent duplicate until job picked for execution" do + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"]) + + args = [200, :worked] + + {:ok, _} = + Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, args, + unique_for: 60, + unique_until: :start + ) + + :timer.sleep(100) + + {:ok, j2} = + Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, args, + unique_for: 60, + unique_until: :start + ) + + {:conflict, ^j2} = + Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, args, + unique_for: 60, + unique_until: :start + ) + + :timer.sleep(400) + assert_received {"worked"} + assert_received {"worked"} + + {:ok, _} = + Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, args, + unique_for: 60, + unique_until: :start + ) + + :timer.sleep(400) + assert_received {"worked"} + stop_process(sup) + end + + defmodule ConstantBackoff do + @behaviour Exq.Backoff.Behaviour + + def offset(_job) do + 1 + end + end + + test "second execution should not clear lock" do + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"], scheduler_enable: true) + + with_application_env(:exq, :backoff, ConstantBackoff, fn -> + args = [200, :worked] + + {:ok, _} = + Exq.enqueue(Exq, "q1", FailWorker, [], + unique_for: 60, + unique_until: :start, + unique_token: "t1", + max_retries: 5 + ) + + :timer.sleep(100) + + {:ok, j2} = + Exq.enqueue(Exq, "q2", ExqTest.SleepWorker, [10000, :worked], + unique_for: 60, + unique_until: :start, + unique_token: "t1" + ) + + :timer.sleep(2000) + + {:conflict, ^j2} = + Exq.enqueue(Exq, "q2", ExqTest.SleepWorker, [10000, :worked], + unique_for: 60, + unique_until: :start, + unique_token: "t1" + ) + end) + + stop_process(sup) + end + + test "prevent duplicate until expiry" do + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"]) + + args = [200, :worked] + + {:ok, j1} = + Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, args, + unique_for: 1, + unique_until: :expiry + ) + + :timer.sleep(300) + assert_received {"worked"} + + {:conflict, ^j1} = + Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, args, + unique_for: 1, + unique_until: :expiry + ) + + :timer.sleep(1500) + + {:ok, _} = + Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, args, + unique_for: 1, + unique_until: :expiry + ) + + :timer.sleep(300) + assert_received {"worked"} + stop_process(sup) + end + + test "prevent duplicate job until success" do + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"]) + + {:ok, j1} = + Exq.enqueue(Exq, "q1", FailWorker, [], + unique_for: 60, + unique_until: :success, + max_retries: 0 + ) + + :timer.sleep(100) + + {:conflict, ^j1} = + Exq.enqueue(Exq, "q1", FailWorker, [], + unique_for: 60, + unique_until: :success, + max_retries: 0 + ) + + stop_process(sup) + end + + test "prevent duplicate scheduled job" do + Process.register(self(), :exqtest) + {:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"], scheduler_enable: true) + {:ok, j1} = Exq.enqueue_in(Exq, "q1", 1, PerformWorker, [], unique_for: 60) + {:conflict, ^j1} = Exq.enqueue_in(Exq, "q1", 1, PerformWorker, [], unique_for: 60) + + :timer.sleep(2000) + assert_received {:worked} + + {:ok, _} = Exq.enqueue_at(Exq, "q1", DateTime.utc_now(), PerformWorker, [], unique_for: 60) + :timer.sleep(1000) + assert_received {:worked} + stop_process(sup) + end + defp enqueue_fail_job(count) do for _ <- 0..(count - 1) do {:ok, _} = diff --git a/test/job_queue_test.exs b/test/job_queue_test.exs index f48e1ce3..ab9e1c74 100644 --- a/test/job_queue_test.exs +++ b/test/job_queue_test.exs @@ -120,11 +120,12 @@ defmodule JobQueueTest do test "scheduler_dequeue enqueue_at" do JobQueue.enqueue_at(:testredis, "test", "default", DateTime.utc_now(), MyWorker, [], []) - {jid, job_serialized} = JobQueue.to_job_serialized("retry", MyWorker, [], retry: true) + {jid, job, job_serialized} = JobQueue.to_job_serialized("retry", MyWorker, [], retry: true) - JobQueue.enqueue_job_at( + JobQueue.do_enqueue_job_at( :testredis, "test", + job, job_serialized, jid, DateTime.utc_now(), @@ -157,7 +158,11 @@ defmodule JobQueueTest do enqueued_at: Time.unix_seconds(), finished_at: nil, processor: nil, - args: [] + args: [], + unique_for: nil, + unique_until: nil, + unique_token: nil, + unlocks_at: nil }, %RuntimeError{} ) @@ -240,14 +245,14 @@ defmodule JobQueueTest do end test "to_job_serialized using module atom" do - {_jid, serialized} = JobQueue.to_job_serialized("default", MyWorker, [], max_retries: 0) + {_jid, _job, serialized} = JobQueue.to_job_serialized("default", MyWorker, [], max_retries: 0) job = Job.decode(serialized) assert job.class == "MyWorker" assert job.retry == 0 end test "to_job_serialized using module string" do - {_jid, serialized} = + {_jid, _job, serialized} = JobQueue.to_job_serialized("default", "MyWorker/perform", [], max_retries: 10) job = Job.decode(serialized) @@ -257,7 +262,7 @@ defmodule JobQueueTest do test "to_job_serialized using existing job ID" do jid = UUID.uuid4() - {^jid, serialized} = JobQueue.to_job_serialized("default", MyWorker, [], jid: jid) + {^jid, _job, serialized} = JobQueue.to_job_serialized("default", MyWorker, [], jid: jid) job = Job.decode(serialized) assert job.jid == jid diff --git a/test/middleware_test.exs b/test/middleware_test.exs index efb82a6f..9983ab3d 100644 --- a/test/middleware_test.exs +++ b/test/middleware_test.exs @@ -239,6 +239,7 @@ defmodule MiddlewareTest do Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, + Exq.Middleware.Unique, Exq.Middleware.Telemetry ] diff --git a/test/test_helper.exs b/test/test_helper.exs index 83855abc..17d8ed69 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -89,13 +89,17 @@ defmodule ExqTestUtil do end def with_application_env(app, key, new, context) do - old = Application.get_env(app, key) + old = Application.get_env(app, key, :undefined) Application.put_env(app, key, new) try do context.() after - Application.put_env(app, key, old) + if old == :undefined do + Application.delete_env(app, key) + else + Application.put_env(app, key, old) + end end end end diff --git a/test/worker_test.exs b/test/worker_test.exs index f9d989ef..e402dd8f 100644 --- a/test/worker_test.exs +++ b/test/worker_test.exs @@ -186,6 +186,7 @@ defmodule WorkerTest do Exq.Middleware.Server.push(middleware, Exq.Middleware.Stats) Exq.Middleware.Server.push(middleware, Exq.Middleware.Job) Exq.Middleware.Server.push(middleware, Exq.Middleware.Manager) + Exq.Middleware.Server.push(middleware, Exq.Middleware.Unique) Exq.Middleware.Server.push(middleware, Exq.Middleware.Logger) start_supervised(%{