Skip to content

Commit

Permalink
Gush::Client now delegates the ActiveJob.perform_later call to `G…
Browse files Browse the repository at this point in the history
…ush::Job` (#117)

This delegation allows for Gush users to easily override the enqueing behavior
to their custom job class, e.g. to support additional ActiveJob options or
enqueing behaviors (e.g synchronous with `perform_now`).
  • Loading branch information
noahfpf authored Aug 8, 2024
1 parent 407578e commit aa0a8a1
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,37 @@ class NotifyWorkflow < Gush::Workflow
end
```

### Customization of ActiveJob enqueueing

There might be a case when you want to customize enqueing a job with more than just the above two options (`queue` and `wait`).

To pass additional options to `ActiveJob.set`, override `Job#worker_options`, e.g.:

```ruby

class ScheduledJob < Gush::Job

def worker_options
super.merge(wait_until: Time.at(params[:start_at]))
end

end
```

Or to entirely customize the ActiveJob integration, override `Job#enqueue_worker!`, e.g.:

```ruby

class SynchronousJob < Gush::Job

def enqueue_worker!(options = {})
Gush::Worker.perform_now(workflow_id, name)
end

end
```


## Command line interface (CLI)

### Checking status
Expand Down
11 changes: 3 additions & 8 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,9 @@ def expire_job(workflow_id, job, ttl=nil)
def enqueue_job(workflow_id, job)
job.enqueue!
persist_job(workflow_id, job)
queue = job.queue || configuration.namespace
wait = job.wait

if wait.present?
Gush::Worker.set(queue: queue, wait: wait).perform_later(*[workflow_id, job.name])
else
Gush::Worker.set(queue: queue).perform_later(*[workflow_id, job.name])
end

options = { queue: configuration.namespace }.merge(job.worker_options)
job.enqueue_worker!(options)
end

private
Expand Down
8 changes: 8 additions & 0 deletions lib/gush/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ def enqueue!
@failed_at = nil
end

def enqueue_worker!(options = {})
Gush::Worker.set(options).perform_later(workflow_id, name)
end

def worker_options
{ queue: queue, wait: wait }.compact
end

def finish!
@finished_at = current_timestamp
end
Expand Down
39 changes: 39 additions & 0 deletions spec/gush/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,45 @@
end
end

describe "#enqueue_worker!" do
it "enqueues the job using Gush::Worker" do
job = described_class.new(name: "a-job", workflow_id: 123)

expect {
job.enqueue_worker!
}.to change{ActiveJob::Base.queue_adapter.enqueued_jobs.size}.from(0).to(1)
end

it "handles ActiveJob.set options" do
freeze_time = Time.utc(2023, 01, 21, 14, 36, 0)

travel_to freeze_time do
job = described_class.new(name: "a-job", workflow_id: 123)
job.enqueue_worker!(wait_until: freeze_time + 5.minutes)
expect(Gush::Worker).to have_a_job_enqueued_at(123, job_with_id(job.class.name), 5.minutes)
end
end
end

describe "#worker_options" do
it "returns a blank options hash by default" do
job = described_class.new
expect(job.worker_options).to eq({})
end

it "returns a hash with the queue setting" do
job = described_class.new
job.queue = 'my-queue'
expect(job.worker_options).to eq({ queue: 'my-queue' })
end

it "returns a hash with the wait setting" do
job = described_class.new
job.wait = 123
expect(job.worker_options).to eq({ wait: 123 })
end
end

describe "#start!" do
it "resets flags and marks as running" do
job = described_class.new(name: "a-job")
Expand Down

0 comments on commit aa0a8a1

Please sign in to comment.