Skip to content

Commit

Permalink
Add error handling, and refined the return value
Browse files Browse the repository at this point in the history
  • Loading branch information
meysius committed Jan 25, 2023
1 parent 0e1dbab commit 90c8ff9
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 32 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ Exq comes with an `enqueue_all` method which guarantees atomicity.


```elixir
[{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}] = Exq.enqueue_all(Exq, [
{:ok, [{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}]} = Exq.enqueue_all(Exq, [
[job_1_queue, job_1_worker, job_1_args, job_1_options],
[job_2_queue, job_2_worker, job_2_args, job_2_options],
[job_3_queue, job_3_worker, job_3_args, job_3_options]
Expand All @@ -549,7 +549,7 @@ Exq comes with an `enqueue_all` method which guarantees atomicity.

`enqueue_all` also supports scheduling jobs via `schedule` key in the `options` passed for each job:
```elixir
[{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}] = Exq.enqueue_all(Exq, [
{:ok, [{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}]} = Exq.enqueue_all(Exq, [
[job_1_queue, job_1_worker, job_1_args, [schedule: {:in, 60 * 60}]],
[job_2_queue, job_2_worker, job_2_args, [schedule: {:at, midnight}]],
[job_3_queue, job_3_worker, job_3_args, []] # no schedule key is present, it is enqueued immediately
Expand Down
18 changes: 10 additions & 8 deletions lib/exq/mock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,16 @@ defmodule Exq.Mock do
end

def enqueue_all(pid, jobs) do
jobs
|> Enum.map(fn [queue, worker, args, options] ->
case options[:schedule] do
{:at, at_time} -> enqueue_at(pid, queue, at_time, worker, args, options)
{:in, offset} -> enqueue_in(pid, queue, offset, worker, args, options)
_ -> enqueue(pid, queue, worker, args, options)
end
end)
{
:ok,
Enum.map(jobs, fn [queue, worker, args, options] ->
case options[:schedule] do
{:at, at_time} -> enqueue_at(pid, queue, at_time, worker, args, options)
{:in, offset} -> enqueue_in(pid, queue, offset, worker, args, options)
_ -> enqueue(pid, queue, worker, args, options)
end
end)
}
end

@impl true
Expand Down
44 changes: 27 additions & 17 deletions lib/exq/redis/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,24 +123,34 @@ defmodule Exq.Redis.JobQueue do
def enqueue_all(redis, namespace, jobs) do
{keys, args} = extract_enqueue_all_keys_and_args(namespace, jobs)

Script.eval!(
redis,
:enqueue_all,
[scheduled_queue_key(namespace), full_key(namespace, "queues")] ++ keys,
args
)
|> case do
{:ok, result} ->
result
|> Enum.map(fn [status, jid] ->
case status do
0 -> {:ok, jid}
1 -> {:conflict, jid}
end
end)
try do
response =
Script.eval!(
redis,
:enqueue_all,
[scheduled_queue_key(namespace), full_key(namespace, "queues")] ++ keys,
args
)

error ->
error
case response do
{:ok, result} ->
{
:ok,
Enum.map(result, fn [status, jid] ->
case status do
0 -> {:ok, jid}
1 -> {:conflict, jid}
end
end)
}

error ->
error
end
catch
:exit, e ->
Logger.info("Error enqueueing - #{Kernel.inspect(e)}")
{:error, :timeout}
end
end

Expand Down
6 changes: 3 additions & 3 deletions test/exq_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ defmodule ExqTest do
{:ok, sup} = Exq.start_link(scheduler_enable: true)
{:ok, _} = Exq.enqueue_at(Exq, "default", DateTime.utc_now(), ExqTest.PerformWorker, [])

[{:ok, _}, {:ok, _}, {:ok, _}] =
{:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
Exq.enqueue_all(Exq, [
["default", ExqTest.PerformArgWorker, [1], []],
["default", ExqTest.PerformArgWorker, [2], [schedule: {:at, DateTime.utc_now()}]],
Expand Down Expand Up @@ -746,7 +746,7 @@ defmodule ExqTest do
Process.register(self(), :exqtest)
{:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"], scheduler_enable: true)

[{:ok, j1}, {:conflict, j2}] =
{:ok, [{:ok, j1}, {:conflict, j2}]} =
Exq.enqueue_all(Exq, [
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]],
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]]
Expand All @@ -757,7 +757,7 @@ defmodule ExqTest do
:timer.sleep(2000)
assert_received {:worked}

[{:ok, _}] =
{:ok, [{:ok, _}]} =
Exq.enqueue_all(Exq, [
["q1", PerformWorker, [], [schedule: {:in, 1}, unique_for: 60]]
])
Expand Down
2 changes: 1 addition & 1 deletion test/fake_mode_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ defmodule FakeModeTest do
scheduled_at = DateTime.utc_now()
assert [] = Exq.Mock.jobs()

assert [{:ok, _}, {:ok, _}, {:ok, _}] =
assert {:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
Exq.enqueue_all(Exq, [
["low", BrokenWorker, [1], []],
["low", BrokenWorker, [2], [schedule: {:at, scheduled_at}]],
Expand Down
2 changes: 1 addition & 1 deletion test/inline_mode_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule InlineModeTest do
end

test "enqueue_all should return the correct value" do
assert [{:ok, _}, {:ok, _}, {:ok, _}] =
assert {:ok, [{:ok, _}, {:ok, _}, {:ok, _}]} =
Exq.enqueue_all(Exq, [
["low", EchoWorker, [1], [schedule: {:in, 300}]],
["low", EchoWorker, [1], [schedule: {:at, DateTime.utc_now()}]],
Expand Down

0 comments on commit 90c8ff9

Please sign in to comment.