Skip to content

Commit

Permalink
Add maintenance job
Browse files Browse the repository at this point in the history
  • Loading branch information
gremerritt committed Sep 20, 2023
1 parent 5e4ef46 commit dd8b8ba
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 17 deletions.
29 changes: 21 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Creating a job group and queueing some jobs:
```ruby
job_group = Delayed::JobGroups::JobGroup.create!

# JobGroup#enqueue has the same signature as Delayed::Job.enqueue
# JobGroup#enqueue has the same signature as Delayed::Job.enqueue
# i.e. it takes a job and an optional hash of options.
job_group.enqueue(MyJob.new('some arg'), queue: 'general')
job_group.enqueue(MyJob.new('some other arg'), queue: 'general', priority: 10)
Expand All @@ -62,15 +62,15 @@ Registering a job to run after all jobs in the job group have completed:

```ruby
# We can optionally pass options that will be used when queueing the on completion job
job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: MyCompletionJob.new,
job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: MyCompletionJob.new,
on_completion_job_options: { queue: 'general' })
```

Registering a job to run if the job group is canceled or fails:

```ruby
# We can optionally pass options that will be used when queueing the on cancellation job
job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: MyCancellationJob.new,
job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: MyCancellationJob.new,
on_cancellation_job_options: { queue: 'general' })
```

Expand All @@ -81,9 +81,9 @@ Block and unblock jobs in a job group:
job_group = Delayed::JobGroups::JobGroup.create!(blocked: true)
job_group.enqueue(MyJob.new('some arg'), queue: 'general')
job_group.mark_queueing_complete

# Do more stuff...

# Unblock the JobGroup so its jobs can run
job_group.unblock
```
Expand All @@ -92,19 +92,32 @@ Cancel a job group:

```ruby
job_group = Delayed::JobGroups::JobGroup.create!

# Do more stuff...

job_group.cancel
```

Configuration to allow failed jobs not to cancel the group
```ruby
# We can optionally pass options that will allow jobs to fail without cancelling the group.
# This also allows the on_completion job to fire once all jobs have either succeeded or failed.
# This also allows the on_completion job to fire once all jobs have either succeeded or failed.
job_group = Delayed::JobGroups::JobGroup.create!(failure_cancels_group: false)
```

### Maintenance

It's possible to end up in a state where all jobs in a group have been completed, but the completion job has not run.
This is due a race condition where the final job in a group is completed, and the worker running it is terminated before
the completion job can be enqueued.

As a remedy for the above scenario, a job is provided which cleans up any job groups in this state. It is recommended to
run this job periodically (for example in a cron job), especially in high-thoughput applications.

```ruby
Delayed::JobGroups::CompleteStaleJobGroupsJob.enqueue
```

## Supported Platforms

* Only the Delayed Job Active Record backend is supported.
Expand Down
1 change: 1 addition & 0 deletions delayed_job_groups.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Gem::Specification.new do |spec|
spec.add_dependency 'activerecord', '>= 5.2', '< 7.1'
spec.add_development_dependency 'coveralls_reborn', '>= 0.18.0'
spec.add_development_dependency 'database_cleaner', '>= 1.2'
spec.add_development_dependency 'factory_bot_rails'
spec.add_development_dependency 'mime-types'
spec.add_development_dependency 'rake'
spec.add_development_dependency 'rspec', '~> 3'
Expand Down
21 changes: 21 additions & 0 deletions lib/delayed/job_groups/complete_stale_job_groups_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

module Delayed
module JobGroups
class CompleteStaleJobGroupsJob
if defined?(Delayed::Extensions::SystemJobWithoutSecurityContext)
include Delayed::Extensions::SystemJobWithoutSecurityContext
end

class << self
def enqueue(**kwargs)
Delayed::Job.enqueue(new, **kwargs)
end
end

def perform
Delayed::JobGroups::JobGroup.ready.find_each(&:check_for_completion)
end
end
end
end
6 changes: 6 additions & 0 deletions lib/delayed/job_groups/job_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class JobGroup < ActiveRecord::Base
has_many :queued_jobs, -> { where(failed_at: nil, locked_by: nil) }, class_name: '::Delayed::Job',
dependent: :delete_all

scope :ready, -> { where(queueing_complete: true, blocked: false) }

def mark_queueing_complete
with_lock do
raise 'JobGroup has already completed queueing' if queueing_complete?
Expand Down Expand Up @@ -52,6 +54,10 @@ def cancel
destroy
end

def check_for_completion
self.class.check_for_completion(id)
end

def self.check_for_completion(job_group_id)
# Optimization to avoid loading and locking the JobGroup when the group
# still has pending jobs
Expand Down
1 change: 1 addition & 0 deletions lib/delayed_job_groups_plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'delayed_job'
require 'delayed_job_active_record'
require 'delayed/job_groups/compatibility'
require 'delayed/job_groups/complete_stale_job_groups_job'
require 'delayed/job_groups/job_extensions'
require 'delayed/job_groups/job_group'
require 'delayed/job_groups/plugin'
Expand Down
35 changes: 35 additions & 0 deletions spec/delayed/job_groups/complete_stale_job_groups_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

describe Delayed::JobGroups::CompleteStaleJobGroupsJob do
describe "#perform" do
let(:job) { described_class.new }

let!(:blocked) { create(:job_group, blocked: true) }
let!(:not_queueing_complete) { create(:job_group, queueing_complete: false) }
let!(:ready) { create(:job_group, queueing_complete: true, blocked: false) }

before do
allow(Delayed::JobGroups::JobGroup).to receive(:check_for_completion)
end

it "checks all relevant job groups for completion" do
job.perform

expect(Delayed::JobGroups::JobGroup).to have_received(:check_for_completion)
.once
.with(ready.id)
end
end

describe "#enqueue" do
before do
allow(Delayed::Job).to receive(:enqueue)
end

it "enqueues the job" do
described_class.enqueue

expect(Delayed::Job).to have_received(:enqueue).with(instance_of(described_class))
end
end
end
2 changes: 1 addition & 1 deletion spec/delayed/job_groups/job_extensions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

describe "delayed job extensions" do
it "provides an optional job_group_id" do
job_group = Delayed::JobGroups::JobGroup.create!
job_group = create(:job_group)
expect(Delayed::Job.new(job_group_id: job_group.id)).to be_valid
expect(Delayed::Job.new).to be_valid
end
Expand Down
87 changes: 83 additions & 4 deletions spec/delayed/job_groups/job_group_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
let(:current_time) { Time.utc(2013) }

subject(:job_group) do
Delayed::JobGroups::JobGroup.create!(on_completion_job: on_completion_job,
on_completion_job_options: on_completion_job_options,
blocked: blocked)
create(
:job_group,
on_completion_job: on_completion_job,
on_completion_job_options: on_completion_job_options,
blocked: blocked
)
end

before do
Expand All @@ -24,6 +27,16 @@
Timecop.return
end

describe "ready scope" do
let!(:blocked) { create(:job_group, blocked: true) }
let!(:not_queueing_complete) { create(:job_group, queueing_complete: false) }
let!(:ready) { create(:job_group, queueing_complete: true, blocked: false) }

it "returns the expected job groups" do
expect(described_class.ready).to match_array(ready)
end
end

shared_examples "the job group was completed" do
it "queues the completion job" do
expect(Delayed::Job).to have_received(:enqueue).with(on_completion_job, on_completion_job_options)
Expand Down Expand Up @@ -76,7 +89,73 @@
end
end

describe ".check_for_completion" do
describe "instance check_for_completion" do
let!(:job) { Delayed::Job.create!(job_group_id: job_group.id) }

before do
job_group.mark_queueing_complete
end

shared_context "complete job and check job group complete" do
before do
job.destroy
job_group.check_for_completion
end
end

context "when no jobs exist" do
include_context "complete job and check job group complete"

it_behaves_like "the job group was completed"
end

context "when active jobs exist" do
before do
Delayed::JobGroups::JobGroup.check_for_completion(job_group.id)
end

it_behaves_like "the job group was not completed"
end

context "when on failed jobs exist" do
before do
job.update!(failed_at: Time.now)
Delayed::JobGroups::JobGroup.check_for_completion(job_group.id)
end

it_behaves_like "the job group was completed"
end

context "when there are no on_completion_job_options" do
let(:on_completion_job_options) { nil }

include_context "complete job and check job group complete"

it "queues the completion job with empty options" do
expect(Delayed::Job).to have_received(:enqueue).with(on_completion_job, {})
end

it "destroys the job group" do
expect(job_group).to have_been_destroyed
end
end

context "when there is no on_completion_job" do
let(:on_completion_job) { nil }

include_context "complete job and check job group complete"

it "doesn't queues the non-existent completion job" do
expect(Delayed::Job).not_to have_received(:enqueue)
end

it "destroys the job group" do
expect(job_group).to have_been_destroyed
end
end
end

describe "class check_for_completion" do
let!(:job) { Delayed::Job.create!(job_group_id: job_group.id) }

before do
Expand Down
11 changes: 7 additions & 4 deletions spec/delayed/job_groups/plugin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
Delayed::Worker.max_attempts = @old_max_attempts
end

let!(:job_group) { Delayed::JobGroups::JobGroup.create!(on_completion_job: TestJobs::CompletionJob.new) }
let!(:job_group) { create(:job_group, on_completion_job: TestJobs::CompletionJob.new) }

it "runs the completion job after completing other jobs" do
job_group.enqueue(TestJobs::NoOpJob.new)
Expand Down Expand Up @@ -215,8 +215,11 @@

context "when a cancellation job is provided" do
let!(:job_group) do
Delayed::JobGroups::JobGroup.create!(on_completion_job: TestJobs::CompletionJob.new,
on_cancellation_job: TestJobs::CancellationJob.new)
create(
:job_group,
on_completion_job: TestJobs::CompletionJob.new,
on_cancellation_job: TestJobs::CancellationJob.new
)
end

it "runs the cancellation job after a job error causes cancellation" do
Expand Down Expand Up @@ -262,7 +265,7 @@
end

context "when a no completion job is provided" do
let!(:job_group) { Delayed::JobGroups::JobGroup.create! }
let!(:job_group) { create(:job_group) }

it "doesn't queue a non-existent completion job" do
job_group.enqueue(TestJobs::NoOpJob.new)
Expand Down
10 changes: 10 additions & 0 deletions spec/factories/job_groups.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# frozen_string_literal: true

FactoryBot.define do
factory :job_group, class: 'Delayed::JobGroups::JobGroup' do
blocked { false }
queueing_complete { false }
on_completion_job { nil }
on_completion_job_options { nil }
end
end
4 changes: 4 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
require 'rspec/its'
require 'database_cleaner'
require 'delayed_job_groups_plugin'
require 'factory_bot'
require 'yaml'
require 'timecop'

Expand All @@ -40,6 +41,7 @@

config.before(:suite) do
DatabaseCleaner.clean_with(:truncation)
FactoryBot.find_definitions
end

config.before do
Expand All @@ -53,4 +55,6 @@
config.after do
DatabaseCleaner.clean
end

config.include FactoryBot::Syntax::Methods
end

0 comments on commit dd8b8ba

Please sign in to comment.