From 2a0f336a86b2f13ce34693f30b382c1d3473b831 Mon Sep 17 00:00:00 2001 From: Noah Harrison Date: Tue, 6 Aug 2024 09:27:14 -0400 Subject: [PATCH] Improved redis performance in large keyspace, added pagination, and auto-expiration This commit makes several interrelated changes: 1. Replaced the redis key scan to find job keys in `Client#find_workflow` with job class name persistence in `Workflow#to_hash`. This significantly improves performance when loading many workflows because it avoids n key scans. 2. Added `Client#workflow_ids` with sorting by creation timestamp and pagination as an alternative to `Client#all_workflows`, which has performance issues in large keyspaces and returns unwieldy amounts of data given a large number of workflows. 3. Added workflow and job indexes by `created_at` and `expires_at` timestamps. The former is necessary for paging through sorted workflow ids, and the latter is necessary to remove data on expiration. 4. Replace use of redis key TTL with explicit expiration via `Client#expire_workflows`, since there's no other way to remove data from the indexes. 5. Added a migration file (and infrastructure) to migrate to the new indexes and expiration format. Given a redis instance with 10,000 workflows, this set of changes allows a page of the most recent 100 workflows to be loaded in 0.1 seconds, whereas previously `all_workflows` would take hours to return data. (Or, for a less extreme example of 1000 workflows, we can load 100 workflows in 0.1 seconds compared to `all_workflows` taking 42 seconds). --- README.md | 51 +++- lib/gush.rb | 1 + lib/gush/cli.rb | 29 +- lib/gush/client.rb | 98 ++++++- .../1_create_gush_workflows_created.rb | 21 ++ lib/gush/migration.rb | 36 +++ lib/gush/workflow.rb | 5 + spec/gush/client_spec.rb | 247 +++++++++++++++++- .../1_create_gush_workflows_created_spec.rb | 42 +++ spec/gush/migration_spec.rb | 23 ++ spec/gush/workflow_spec.rb | 14 + 11 files changed, 549 insertions(+), 18 deletions(-) create mode 100644 lib/gush/migrate/1_create_gush_workflows_created.rb create mode 100644 lib/gush/migration.rb create mode 100644 spec/gush/migrate/1_create_gush_workflows_created_spec.rb create mode 100644 spec/gush/migration_spec.rb diff --git a/README.md b/README.md index e99b7d1..9b9d2e5 100644 --- a/README.md +++ b/README.md @@ -256,6 +256,28 @@ flow.status `reload` is needed to see the latest status, since workflows are updated asynchronously. +## Loading workflows + +### Finding a workflow by id + +``` +flow = Workflow.find(id) +``` + +### Paging through workflows + +To get workflows with pagination, use start and stop (inclusive) index values: + +``` +flows = Workflow.page(0, 99) +``` + +Or in reverse order: + +``` +flows = Workflow.page(0, 99, order: :desc) +``` + ## Advanced features ### Global parameters for jobs @@ -418,12 +440,18 @@ end bundle exec gush show ``` -- of all created workflows: +- of a page of workflows: ``` bundle exec gush list ``` +- of the most recent 100 workflows + + ``` + bundle exec gush list -99 -1 + ``` + ### Vizualizing workflows as image This requires that you have imagemagick installed on your computer: @@ -449,7 +477,9 @@ end ### Cleaning up afterwards -Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is ran. This data might be useful for analysis but at a certain point it can be purged via Redis TTL. By default gush and Redis will keep keys forever. To configure expiration you need to 2 things. Create initializer (specify config.ttl in seconds, be different per environment). +Running `NotifyWorkflow.create` inserts multiple keys into Redis every time it is run. This data might be useful for analysis but at a certain point it can be purged. By default gush and Redis will keep keys forever. To configure expiration you need to do two things. + +1. Create an initializer that specifies `config.ttl` in seconds. Best NOT to set TTL to be too short (like minutes) but about a week in length. ```ruby # config/initializers/gush.rb @@ -460,7 +490,9 @@ Gush.configure do |config| end ``` -And you need to call `flow.expire!` (optionally passing custom TTL value overriding `config.ttl`). This gives you control whether to expire data for specific workflow. Best NOT to set TTL to be too short (like minutes) but about a week in length. And you can run `Client.expire_workflow` and `Client.expire_job` passing appropriate IDs and TTL (pass -1 to NOT expire) values. +2. Call `Client#expire_workflows` periodically, which will clear all expired stored workflow and job data and indexes. This method can be called at any rate, but ideally should be called at least once for every 1000 workflows created. + +If you need more control over individual workflow expiration, you can call `flow.expire!(ttl)` with a TTL different from the Gush configuration, or with -1 to never expire the workflow. ### Avoid overlapping workflows @@ -478,6 +510,19 @@ def find_by_class klass end ``` +## Gush 3.0 Migration + +Gush 3.0 adds indexing for fast workflow pagination and changes the mechanism for expiring workflow data from Redis. + +### Migration + +Run `bundle exec gush migrate` after upgrading. This will update internal data structures. + +### Expiration API + +Periodically run `Gush::Client.new.expire_workflows` to expire data. Workflows will be automatically enrolled in this expiration, so there is no longer a need to call `workflow.expire!`. + + ## Contributors - [Mateusz Lenik](https://github.com/mlen) diff --git a/lib/gush.rb b/lib/gush.rb index 15da935..e9c6c8d 100644 --- a/lib/gush.rb +++ b/lib/gush.rb @@ -15,6 +15,7 @@ require "gush/configuration" require "gush/errors" require "gush/job" +require "gush/migration" require "gush/worker" require "gush/workflow" diff --git a/lib/gush/cli.rb b/lib/gush/cli.rb index 681ed80..eb999b6 100644 --- a/lib/gush/cli.rb +++ b/lib/gush/cli.rb @@ -70,9 +70,14 @@ def rm(workflow_id) client.destroy_workflow(workflow) end - desc "list", "Lists all workflows with their statuses" - def list - workflows = client.all_workflows + desc "list START STOP", "Lists workflows from START index through STOP index with their statuses" + option :start, type: :numeric, default: nil + option :stop, type: :numeric, default: nil + def list(start=nil, stop=nil) + workflows = client.workflow_ids(start, stop).map do |id| + client.find_workflow(id) + end + rows = workflows.map do |workflow| [workflow.id, (Time.at(workflow.started_at) if workflow.started_at), workflow.class, {alignment: :center, value: status_for(workflow)}] end @@ -120,6 +125,24 @@ def viz(class_or_id) end end + desc "migrate", "Runs all unapplied migrations to Gush storage" + def migrate + Dir[File.join(__dir__, 'migrate', '*.rb')].each {|file| require file } + + applied = Gush::Migration.subclasses.sort(&:version).count do |klass| + migration = klass.new + next if migration.migrated? + + puts "Migrating to #{klass.name} (#{migration.version})" + migration.migrate + puts "== #{migration.version} #{klass.name}: migrated ===" + + true + end + + puts "#{applied} #{"migrations".pluralize(applied)} applied" + end + private def client diff --git a/lib/gush/client.rb b/lib/gush/client.rb index 20c273a..6423007 100644 --- a/lib/gush/client.rb +++ b/lib/gush/client.rb @@ -80,6 +80,37 @@ def next_free_workflow_id id end + # Returns the specified range of workflow ids, sorted by created timestamp. + # + # @param start, stop [Integer] see https://redis.io/docs/latest/commands/zrange/#index-ranges + # for details on the start and stop parameters. + # @param by_ts [Boolean] if true, start and stop are treated as timestamps + # rather than as element indexes, which allows the workflows to be indexed + # by created timestamp + # @param order [Symbol] if :asc, finds ids in ascending created timestamp; + # if :desc, finds ids in descending created timestamp + # @returns [Array] array of workflow ids + def workflow_ids(start=nil, stop=nil, by_ts: false, order: :asc) + start ||= 0 + stop ||= 99 + + redis.zrange( + "gush.idx.workflows.created_at", + start, + stop, + by_score: by_ts, + rev: order&.to_sym == :desc + ) + end + + def workflows(start=nil, stop=nil, **kwargs) + workflow_ids(start, stop, **kwargs).map { |id| find_workflow(id) } + end + + # Deprecated. + # + # This method is not performant when there are a large number of workflows + # or when the redis keyspace is large. Use workflows instead with pagination. def all_workflows redis.scan_each(match: "gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") @@ -92,7 +123,13 @@ def find_workflow(id) unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) - keys = redis.scan_each(match: "gush.jobs.#{id}.*") + + if hash[:job_klasses] + keys = hash[:job_klasses].map { |klass| "gush.jobs.#{id}.#{klass}" } + else + # For backwards compatibility, get job keys via a full keyspace scan + keys = redis.scan_each(match: "gush.jobs.#{id}.*") + end nodes = keys.each_with_object([]) do |key, array| array.concat redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } @@ -105,15 +142,25 @@ def find_workflow(id) end def persist_workflow(workflow) + created_at = Time.now.to_f + added = redis.zadd("gush.idx.workflows.created_at", created_at, workflow.id, nx: true) + + if added && configuration.ttl&.positive? + expires_at = created_at + configuration.ttl + redis.zadd("gush.idx.workflows.expires_at", expires_at, workflow.id, nx: true) + end + redis.set("gush.workflows.#{workflow.id}", workflow.to_json) - workflow.jobs.each {|job| persist_job(workflow.id, job) } + workflow.jobs.each {|job| persist_job(workflow.id, job, expires_at: expires_at) } workflow.mark_as_persisted true end - def persist_job(workflow_id, job) + def persist_job(workflow_id, job, expires_at: nil) + redis.zadd("gush.idx.jobs.expires_at", expires_at, "#{workflow_id}.#{job.klass}", nx: true) if expires_at + redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json) end @@ -134,22 +181,59 @@ def find_job(workflow_id, job_name) def destroy_workflow(workflow) redis.del("gush.workflows.#{workflow.id}") + redis.zrem("gush.idx.workflows.created_at", workflow.id) + redis.zrem("gush.idx.workflows.expires_at", workflow.id) workflow.jobs.each {|job| destroy_job(workflow.id, job) } end def destroy_job(workflow_id, job) redis.del("gush.jobs.#{workflow_id}.#{job.klass}") + redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}") + end + + def expire_workflows(expires_at=nil) + expires_at ||= Time.now.to_f + + ids = redis.zrange("gush.idx.workflows.expires_at", "-inf", expires_at, by_score: true) + return if ids.empty? + + redis.del(ids.map { |id| "gush.workflows.#{id}" }) + redis.zrem("gush.idx.workflows.created_at", ids) + redis.zrem("gush.idx.workflows.expires_at", ids) + + expire_jobs(expires_at) + end + + def expire_jobs(expires_at=nil) + expires_at ||= Time.now.to_f + + keys = redis.zrange("gush.idx.jobs.expires_at", "-inf", expires_at, by_score: true) + return if keys.empty? + + redis.del(keys.map { |key| "gush.jobs.#{key}" }) + redis.zrem("gush.idx.jobs.expires_at", keys) end def expire_workflow(workflow, ttl=nil) - ttl = ttl || configuration.ttl - redis.expire("gush.workflows.#{workflow.id}", ttl) + ttl ||= configuration.ttl + + if ttl&.positive? + redis.zadd("gush.idx.workflows.expires_at", Time.now.to_f + ttl, workflow.id) + else + redis.zrem("gush.idx.workflows.expires_at", workflow.id) + end + workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) } end def expire_job(workflow_id, job, ttl=nil) - ttl = ttl || configuration.ttl - redis.expire("gush.jobs.#{workflow_id}.#{job.klass}", ttl) + ttl ||= configuration.ttl + + if ttl&.positive? + redis.zadd("gush.idx.jobs.expires_at", Time.now.to_f + ttl, "#{workflow_id}.#{job.klass}") + else + redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}") + end end def enqueue_job(workflow_id, job) diff --git a/lib/gush/migrate/1_create_gush_workflows_created.rb b/lib/gush/migrate/1_create_gush_workflows_created.rb new file mode 100644 index 0000000..64d14de --- /dev/null +++ b/lib/gush/migrate/1_create_gush_workflows_created.rb @@ -0,0 +1,21 @@ +module Gush + class IndexWorkflowsByCreatedAtAndExpiresAt < Gush::Migration + def self.version + 1 + end + + def up + redis.scan_each(match: "gush.workflows.*").map do |key| + id = key.sub("gush.workflows.", "") + workflow = client.find_workflow(id) + + ttl = redis.ttl(key) + redis.persist(key) + workflow.jobs.each { |job| redis.persist("gush.jobs.#{id}.#{job.klass}") } + + client.persist_workflow(workflow) + client.expire_workflow(workflow, ttl.positive? ? ttl : -1) + end + end + end +end diff --git a/lib/gush/migration.rb b/lib/gush/migration.rb new file mode 100644 index 0000000..cff1f7f --- /dev/null +++ b/lib/gush/migration.rb @@ -0,0 +1,36 @@ +module Gush + class Migration + def migrate + return if migrated? + + up + migrated! + end + + def up + # subclass responsibility + end + + def version + self.class.version + end + + def migrated? + redis.sismember("gush.migration.schema_migrations", version) + end + + private + + def migrated! + redis.sadd("gush.migration.schema_migrations", version) + end + + def client + @client ||= Client.new + end + + def redis + Gush::Client.redis_connection(client.configuration) + end + end +end diff --git a/lib/gush/workflow.rb b/lib/gush/workflow.rb index 55ade08..d457189 100644 --- a/lib/gush/workflow.rb +++ b/lib/gush/workflow.rb @@ -22,6 +22,10 @@ def self.find(id) Gush::Client.new.find_workflow(id) end + def self.page(start=0, stop=99, order: :asc) + Gush::Client.new.workflows(start, stop, order: order) + end + def self.create(*args, **kwargs) flow = new(*args, **kwargs) flow.save @@ -184,6 +188,7 @@ def to_hash total: jobs.count, finished: jobs.count(&:finished?), klass: name, + job_klasses: jobs.map(&:class).map(&:to_s).uniq, status: status, stopped: stopped, started_at: started_at, diff --git a/spec/gush/client_spec.rb b/spec/gush/client_spec.rb index 415dd08..9285a5d 100644 --- a/spec/gush/client_spec.rb +++ b/spec/gush/client_spec.rb @@ -48,6 +48,20 @@ expect(workflow.globals[:global1]).to eq('foo') end end + + context "when workflow was persisted without job_klasses" do + it "returns Workflow object" do + expected_workflow = TestWorkflow.create + + json = Gush::JSON.encode(expected_workflow.to_hash.except(:job_klasses)) + redis.set("gush.workflows.#{expected_workflow.id}", json) + + workflow = client.find_workflow(expected_workflow.id) + + expect(workflow.id).to eq(expected_workflow.id) + expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name)) + end + end end end @@ -101,38 +115,187 @@ it "persists JSON dump of the Workflow and its jobs" do job = double("job", to_json: 'json') workflow = double("workflow", id: 'abcd', jobs: [job, job, job], to_json: '"json"') - expect(client).to receive(:persist_job).exactly(3).times.with(workflow.id, job) + expect(client).to receive(:persist_job).exactly(3).times.with(workflow.id, job, expires_at: nil) expect(workflow).to receive(:mark_as_persisted) client.persist_workflow(workflow) expect(redis.keys("gush.workflows.abcd").length).to eq(1) end + + it "sets created_at index" do + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted).twice + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + travel_to(freeze_time) do + client.persist_workflow(workflow) + end + + expect(redis.zrange("gush.idx.workflows.created_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, freeze_time.to_f]]) + + # Persisting the workflow again should not affect its created_at index score + client.persist_workflow(workflow) + expect(redis.zrange("gush.idx.workflows.created_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, freeze_time.to_f]]) + end + + it "sets expires_at index when there is a ttl configured" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted).twice + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + travel_to(freeze_time) do + client.persist_workflow(workflow) + end + + expires_at = freeze_time + 1000 + expect(redis.zrange("gush.idx.workflows.expires_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, expires_at.to_f]]) + + # Persisting the workflow again should not affect its expires_at index score + client.persist_workflow(workflow) + expect(redis.zrange("gush.idx.workflows.expires_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, expires_at.to_f]]) + end + + it "does not set expires_at index when there is no ttl configured" do + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted) + client.persist_workflow(workflow) + + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + end + + it "does not set expires_at index when updating a pre-existing workflow without a ttl" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted).twice + + client.persist_workflow(workflow) + + client.expire_workflow(workflow, -1) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + + client.persist_workflow(workflow) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + end + + it "does not change expires_at index when updating a pre-existing workflow with a non-standard ttl" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + workflow = double("workflow", id: 'abcd', jobs: [], to_json: '"json"') + expect(workflow).to receive(:mark_as_persisted).twice + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + travel_to(freeze_time) do + client.persist_workflow(workflow) + + expires_at = freeze_time.to_i + 1234 + client.expire_workflow(workflow, 1234) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to eq(expires_at) + + client.persist_workflow(workflow) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to eq(expires_at) + end + end end describe "#destroy_workflow" do it "removes all Redis keys related to the workflow" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + workflow = TestWorkflow.create expect(redis.keys("gush.workflows.#{workflow.id}").length).to eq(1) expect(redis.keys("gush.jobs.#{workflow.id}.*").length).to eq(5) + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(1) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(1) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(5) client.destroy_workflow(workflow) expect(redis.keys("gush.workflows.#{workflow.id}").length).to eq(0) expect(redis.keys("gush.jobs.#{workflow.id}.*").length).to eq(0) + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(0) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) + end + end + + describe "#expire_workflows" do + it "removes auto-expired workflows" do + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + workflow = TestWorkflow.create + + # before workflow's expiration time + client.expire_workflows + + expect(redis.keys("gush.workflows.*").length).to eq(1) + + # after workflow's expiration time + client.expire_workflows(Time.now.to_f + 1001) + + expect(redis.keys("gush.workflows.#{workflow.id}").length).to eq(0) + expect(redis.keys("gush.jobs.#{workflow.id}.*").length).to eq(0) + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(0) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) + end + + it "removes manually-expired workflows" do + workflow = TestWorkflow.create + + # workflow hasn't been expired + client.expire_workflows(Time.now.to_f + 100_000) + + expect(redis.keys("gush.workflows.*").length).to eq(1) + + client.expire_workflow(workflow, 10) + + # after workflow's expiration time + client.expire_workflows(Time.now.to_f + 20) + + expect(redis.keys("gush.workflows.#{workflow.id}").length).to eq(0) + expect(redis.keys("gush.jobs.#{workflow.id}.*").length).to eq(0) + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(0) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) end end describe "#expire_workflow" do let(:ttl) { 2000 } - it "sets TTL for all Redis keys related to the workflow" do + it "sets an expiration time for the workflow" do + workflow = TestWorkflow.create + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + expires_at = freeze_time.to_f + ttl + travel_to(freeze_time) do + client.expire_workflow(workflow, ttl) + end + + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to eq(expires_at) + + workflow.jobs.each do |job| + expect(redis.zscore("gush.idx.jobs.expires_at", "#{workflow.id}.#{job.klass}")).to eq(expires_at) + end + end + + it "clears an expiration time for the workflow when given -1" do workflow = TestWorkflow.create - client.expire_workflow(workflow, ttl) + client.expire_workflow(workflow, 100) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to be > 0 - expect(redis.ttl("gush.workflows.#{workflow.id}")).to eq(ttl) + client.expire_workflow(workflow, -1) + expect(redis.zscore("gush.idx.workflows.expires_at", workflow.id)).to eq(nil) workflow.jobs.each do |job| - expect(redis.ttl("gush.jobs.#{workflow.id}.#{job.klass}")).to eq(ttl) + expect(redis.zscore("gush.idx.jobs.expires_at", "#{workflow.id}.#{job.klass}")).to eq(nil) end end end @@ -145,6 +308,80 @@ client.persist_job('deadbeef', job) expect(redis.keys("gush.jobs.deadbeef.*").length).to eq(1) end + + it "sets expires_at index when expires_at is provided" do + job = BobJob.new(name: 'bob', id: 'abcd123') + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + expires_at = freeze_time.to_f + 1000 + + travel_to(freeze_time) do + client.persist_job('deadbeef', job, expires_at: expires_at) + end + + expect(redis.zrange("gush.idx.jobs.expires_at", 0, -1, with_scores: true)) + .to eq([["deadbeef.#{job.klass}", expires_at]]) + + # Persisting the workflow again should not affect its expires_at index score + client.persist_job('deadbeef', job) + expect(redis.zrange("gush.idx.jobs.expires_at", 0, -1, with_scores: true)) + .to eq([["deadbeef.#{job.klass}", expires_at]]) + end + + it "does not set expires_at index when there is no ttl configured" do + job = BobJob.new(name: 'bob', id: 'abcd123') + client.persist_job('deadbeef', job) + + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) + end + end + + describe "#workflow_ids" do + it "returns a page of registered workflow ids" do + workflow = TestWorkflow.create + ids = client.workflow_ids + expect(ids).to eq([workflow.id]) + end + + it "sorts workflow ids by created time or reverse created time" do + ids = 3.times.map { TestWorkflow.create }.map(&:id) + + expect(client.workflow_ids).to eq(ids) + expect(client.workflow_ids(order: :asc)).to eq(ids) + expect(client.workflow_ids(order: :desc)).to eq(ids.reverse) + end + + it "supports start and stop params" do + ids = 3.times.map { TestWorkflow.create }.map(&:id) + + expect(client.workflow_ids(0, 1)).to eq(ids.slice(0..1)) + expect(client.workflow_ids(1, 1)).to eq(ids.slice(1..1)) + expect(client.workflow_ids(1, 10)).to eq(ids.slice(1..2)) + expect(client.workflow_ids(0, -1)).to eq(ids) + end + + it "supports start and stop params using created timestamps" do + times = [100, 200, 300] + ids = [] + + times.each do |t| + travel_to Time.at(t) do + ids << TestWorkflow.create.id + end + end + + expect(client.workflow_ids(0, 1, by_ts: true)).to be_empty + expect(client.workflow_ids(50, 150, by_ts: true)).to eq(ids.slice(0..0)) + expect(client.workflow_ids(150, 50, by_ts: true, order: :desc)).to eq(ids.slice(0..0)) + expect(client.workflow_ids("-inf", "inf", by_ts: true)).to eq(ids) + end + end + + describe "#workflows" do + it "returns a page of registered workflows" do + workflow = TestWorkflow.create + expect(client.workflows.map(&:id)).to eq([workflow.id]) + end end describe "#all_workflows" do diff --git a/spec/gush/migrate/1_create_gush_workflows_created_spec.rb b/spec/gush/migrate/1_create_gush_workflows_created_spec.rb new file mode 100644 index 0000000..32d00a9 --- /dev/null +++ b/spec/gush/migrate/1_create_gush_workflows_created_spec.rb @@ -0,0 +1,42 @@ +require 'spec_helper' +require 'gush/migrate/1_create_gush_workflows_created' + +describe Gush::IndexWorkflowsByCreatedAtAndExpiresAt do + + describe "#up" do + it "adds existing workflows to created_at index, but not expires_at index" do + TestWorkflow.create + redis.del("gush.idx.workflows.created_at") + + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + subject.migrate + + expect(redis.zcard("gush.idx.workflows.created_at")).to eq(1) + expect(redis.zcard("gush.idx.workflows.expires_at")).to eq(0) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(0) + end + + it "adds expiring workflows to expires_at index" do + workflow = TestWorkflow.create + redis.del("gush.idx.workflows.created_at") + + freeze_time = Time.now.round # travel_to doesn't support fractions of a second + travel_to(freeze_time) do + redis.expire("gush.workflows.#{workflow.id}", 1234) + expires_at = freeze_time.to_f + 1234 + + allow_any_instance_of(Gush::Configuration).to receive(:ttl).and_return(1000) + + subject.migrate + + expect(redis.ttl("gush.workflows.#{workflow.id}")).to eq(-1) + expect(redis.ttl("gush.jobs.#{workflow.id}.#{workflow.jobs.first.class.name}")).to eq(-1) + + expect(redis.zrange("gush.idx.workflows.expires_at", 0, -1, with_scores: true)) + .to eq([[workflow.id, expires_at]]) + expect(redis.zcard("gush.idx.jobs.expires_at")).to eq(5) + end + end + end +end diff --git a/spec/gush/migration_spec.rb b/spec/gush/migration_spec.rb new file mode 100644 index 0000000..d274402 --- /dev/null +++ b/spec/gush/migration_spec.rb @@ -0,0 +1,23 @@ +require 'spec_helper' + +describe Gush::Migration do + + describe "#migrate" do + it "applies a migration once" do + class TestMigration < Gush::Migration + def self.version + 123 + end + end + + migration = TestMigration.new + expect(migration).to receive(:up).once + + expect(migration.migrated?).to be(false) + migration.migrate + + expect(migration.migrated?).to be(true) + migration.migrate + end + end +end diff --git a/spec/gush/workflow_spec.rb b/spec/gush/workflow_spec.rb index 8bbb413..56e7804 100644 --- a/spec/gush/workflow_spec.rb +++ b/spec/gush/workflow_spec.rb @@ -67,6 +67,19 @@ def configure(*args) end end + describe "#find" do + it "fiends a workflow by id" do + expect(Gush::Workflow.find(subject.id).id).to eq(subject.id) + end + end + + describe "#page" do + it "returns a page of registered workflows" do + flow = TestWorkflow.create + expect(Gush::Workflow.page.map(&:id)).to eq([flow.id]) + end + end + describe "#status" do context "when failed" do it "returns :failed" do @@ -145,6 +158,7 @@ def configure(*args) "id" => an_instance_of(String), "name" => klass.to_s, "klass" => klass.to_s, + "job_klasses" => ["FetchFirstJob", "PersistFirstJob"], "status" => "running", "total" => 2, "finished" => 0,