Skip to content

Commit

Permalink
Merge pull request #28 from salsify/LV-6538-closer-task
Browse files Browse the repository at this point in the history
Add CompleteStuckJobGroupsJob [LV-6538]
  • Loading branch information
gremerritt authored Sep 21, 2023
2 parents 5e4ef46 + 9a2c548 commit 380ba44
Show file tree
Hide file tree
Showing 20 changed files with 250 additions and 47 deletions.
18 changes: 8 additions & 10 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ version: 2.1
jobs:
lint:
docker:
- image: salsify/ruby_ci:2.7.7
- image: cimg/ruby:3.0.6
working_directory: ~/delayed_job_groups
steps:
- checkout
- restore_cache:
keys:
- v1-gems-ruby-2.7.7-{{ checksum "delayed_job_groups.gemspec" }}-{{ checksum "Gemfile" }}
- v1-gems-ruby-2.7.7-
- v1-gems-ruby-3.0.6-{{ checksum "delayed_job_groups.gemspec" }}-{{ checksum "Gemfile" }}
- v1-gems-ruby-3.0.6-
- run:
name: Install Gems
command: |
Expand All @@ -18,7 +18,7 @@ jobs:
bundle clean
fi
- save_cache:
key: v1-gems-ruby-2.7.7-{{ checksum "delayed_job_groups.gemspec" }}-{{ checksum "Gemfile" }}
key: v1-gems-ruby-3.0.6-{{ checksum "delayed_job_groups.gemspec" }}-{{ checksum "Gemfile" }}
paths:
- "vendor/bundle"
- "gemfiles/vendor/bundle"
Expand All @@ -32,7 +32,7 @@ jobs:
ruby_version:
type: string
docker:
- image: salsify/ruby_ci:<< parameters.ruby_version >>
- image: cimg/ruby:<< parameters.ruby_version >>
environment:
CIRCLE_TEST_REPORTS: "test-results"
BUNDLE_GEMFILE: << parameters.gemfile >>
Expand Down Expand Up @@ -69,11 +69,9 @@ workflows:
matrix:
parameters:
gemfile:
- "gemfiles/rails_6.0.gemfile"
- "gemfiles/rails_6.1.gemfile"
- "gemfiles/rails_7.0.gemfile"
ruby_version:
- "2.7.7"
- "3.0.5"
- "3.1.3"
- "3.2.0"
- "3.0.6"
- "3.1.4"
- "3.2.2"
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ test/tmp
test/version_tmp
tmp
*.gemfile.lock
.ruby-version
2 changes: 1 addition & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ inherit_gem:
salsify_rubocop: conf/rubocop.yml

AllCops:
TargetRubyVersion: 2.7
TargetRubyVersion: 3.0
Exclude:
- 'vendor/**/*'
- 'gemfiles/**/*'
Expand Down
5 changes: 0 additions & 5 deletions Appraisals
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# frozen_string_literal: true

appraise 'rails-6.0' do
gem 'activerecord', '~> 6.0.4'
gem 'activesupport', '~> 6.0.4'
end

appraise 'rails-6.1' do
gem 'activerecord', '~> 6.1.5'
gem 'activesupport', '~> 6.1.5'
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

### 0.9.0
- Add a `CompleteStuckJobGroupsJob`, which can be run periodically to close "stuck" job groups
- Drop support for Ruby 2.7
- Drop support for Rails 6.0

### 0.8.0
- Drop support for ruby < 2.7
- Add support for ruby 3.1
Expand Down
29 changes: 21 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Creating a job group and queueing some jobs:
```ruby
job_group = Delayed::JobGroups::JobGroup.create!

# JobGroup#enqueue has the same signature as Delayed::Job.enqueue
# JobGroup#enqueue has the same signature as Delayed::Job.enqueue
# i.e. it takes a job and an optional hash of options.
job_group.enqueue(MyJob.new('some arg'), queue: 'general')
job_group.enqueue(MyJob.new('some other arg'), queue: 'general', priority: 10)
Expand All @@ -62,15 +62,15 @@ Registering a job to run after all jobs in the job group have completed:

```ruby
# We can optionally pass options that will be used when queueing the on completion job
job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: MyCompletionJob.new,
job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: MyCompletionJob.new,
on_completion_job_options: { queue: 'general' })
```

Registering a job to run if the job group is canceled or fails:

```ruby
# We can optionally pass options that will be used when queueing the on cancellation job
job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: MyCancellationJob.new,
job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: MyCancellationJob.new,
on_cancellation_job_options: { queue: 'general' })
```

Expand All @@ -81,9 +81,9 @@ Block and unblock jobs in a job group:
job_group = Delayed::JobGroups::JobGroup.create!(blocked: true)
job_group.enqueue(MyJob.new('some arg'), queue: 'general')
job_group.mark_queueing_complete

# Do more stuff...

# Unblock the JobGroup so its jobs can run
job_group.unblock
```
Expand All @@ -92,19 +92,32 @@ Cancel a job group:

```ruby
job_group = Delayed::JobGroups::JobGroup.create!

# Do more stuff...

job_group.cancel
```

Configuration to allow failed jobs not to cancel the group
```ruby
# We can optionally pass options that will allow jobs to fail without cancelling the group.
# This also allows the on_completion job to fire once all jobs have either succeeded or failed.
# This also allows the on_completion job to fire once all jobs have either succeeded or failed.
job_group = Delayed::JobGroups::JobGroup.create!(failure_cancels_group: false)
```

### Maintenance

It's possible to end up in a state where all jobs in a group have been completed, but the completion job has not run.
This is due a race condition where the final job in a group is completed, and the worker running it is terminated before
the completion job can be enqueued.

As a remedy for the above scenario, a job is provided which cleans up any job groups in this state. It is recommended to
run this job periodically (for example in a cron job), especially in high-thoughput applications.

```ruby
Delayed::JobGroups::CompleteStuckJobGroupsJob.enqueue
```

## Supported Platforms

* Only the Delayed Job Active Record backend is supported.
Expand Down
15 changes: 15 additions & 0 deletions bin/console
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env ruby
# frozen_string_literal: true

require 'bundler/setup'
require 'delayed_job_groups_plugin'

# You can add fixtures and/or initialization code here to make experimenting
# with your gem easier. You can also use a different console, if you like.

# (If you use this, don't forget to add pry to your Gemfile!)
# require "pry"
# Pry.start

require 'irb'
IRB.start
5 changes: 3 additions & 2 deletions delayed_job_groups.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Gem::Specification.new do |spec|
spec.test_files = Dir.glob('spec/**/*')
spec.require_paths = ['lib']

spec.required_ruby_version = '>= 2.7'
spec.required_ruby_version = '>= 3.0'

spec.add_dependency 'delayed_job', '>= 4.1'
spec.add_dependency 'delayed_job_active_record', '>= 4.1'
Expand All @@ -34,9 +34,10 @@ Gem::Specification.new do |spec|
'for upgrade/installation notes.'

spec.add_development_dependency 'appraisal'
spec.add_dependency 'activerecord', '>= 5.2', '< 7.1'
spec.add_dependency 'activerecord', '>= 6.1', '< 7.1'
spec.add_development_dependency 'coveralls_reborn', '>= 0.18.0'
spec.add_development_dependency 'database_cleaner', '>= 1.2'
spec.add_development_dependency 'factory_bot_rails'
spec.add_development_dependency 'mime-types'
spec.add_development_dependency 'rake'
spec.add_development_dependency 'rspec', '~> 3'
Expand Down
8 changes: 0 additions & 8 deletions gemfiles/rails_6.0.gemfile

This file was deleted.

19 changes: 19 additions & 0 deletions lib/delayed/job_groups/complete_stuck_job_groups_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module Delayed
module JobGroups
class CompleteStuckJobGroupsJob
class << self
def enqueue(**kwargs)
Delayed::Job.enqueue(new, **kwargs)
end
end

def perform
Delayed::JobGroups::JobGroup.ready.with_no_open_jobs.find_each do |job_group|
job_group.check_for_completion(skip_pending_jobs_check: true)
end
end
end
end
end
13 changes: 11 additions & 2 deletions lib/delayed/job_groups/job_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class JobGroup < ActiveRecord::Base
has_many :queued_jobs, -> { where(failed_at: nil, locked_by: nil) }, class_name: '::Delayed::Job',
dependent: :delete_all

scope :ready, -> { where(queueing_complete: true, blocked: false) }
scope :with_no_open_jobs, -> do
where("NOT EXISTS (#{Delayed::Job.where('delayed_jobs.job_group_id = delayed_job_groups.id').to_sql})")
end

def mark_queueing_complete
with_lock do
raise 'JobGroup has already completed queueing' if queueing_complete?
Expand Down Expand Up @@ -52,10 +57,14 @@ def cancel
destroy
end

def self.check_for_completion(job_group_id)
def check_for_completion(skip_pending_jobs_check: false)
self.class.check_for_completion(id, skip_pending_jobs_check: skip_pending_jobs_check)
end

def self.check_for_completion(job_group_id, skip_pending_jobs_check: false)
# Optimization to avoid loading and locking the JobGroup when the group
# still has pending jobs
return if has_pending_jobs?(job_group_id)
return if !skip_pending_jobs_check && has_pending_jobs?(job_group_id)

transaction do
# The first completed job to notice the job group's queue count has dropped to
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/job_groups/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Delayed
module JobGroups
VERSION = '0.8.0'
VERSION = '0.9.0'
end
end
1 change: 1 addition & 0 deletions lib/delayed_job_groups_plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'delayed_job'
require 'delayed_job_active_record'
require 'delayed/job_groups/compatibility'
require 'delayed/job_groups/complete_stuck_job_groups_job'
require 'delayed/job_groups/job_extensions'
require 'delayed/job_groups/job_group'
require 'delayed/job_groups/plugin'
Expand Down
40 changes: 40 additions & 0 deletions spec/delayed/job_groups/complete_stuck_job_groups_job_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# frozen_string_literal: true

describe Delayed::JobGroups::CompleteStuckJobGroupsJob do
describe "#perform" do
let(:job) { described_class.new }

let!(:blocked) { create(:job_group, blocked: true) }
let!(:not_queueing_complete) { create(:job_group, queueing_complete: false) }
let!(:ready_without_jobs) { create(:job_group, queueing_complete: true, blocked: false) }
let!(:ready_with_jobs) do
create(:job_group, queueing_complete: true, blocked: false).tap do |job_group|
create(:delayed_job, job_group: job_group)
end
end

before do
allow(Delayed::JobGroups::JobGroup).to receive(:check_for_completion)
end

it "checks all relevant job groups for completion" do
job.perform

expect(Delayed::JobGroups::JobGroup).to have_received(:check_for_completion)
.once
.with(ready_without_jobs.id, skip_pending_jobs_check: true)
end
end

describe "#enqueue" do
before do
allow(Delayed::Job).to receive(:enqueue)
end

it "enqueues the job" do
described_class.enqueue

expect(Delayed::Job).to have_received(:enqueue).with(instance_of(described_class))
end
end
end
2 changes: 1 addition & 1 deletion spec/delayed/job_groups/job_extensions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

describe "delayed job extensions" do
it "provides an optional job_group_id" do
job_group = Delayed::JobGroups::JobGroup.create!
job_group = create(:job_group)
expect(Delayed::Job.new(job_group_id: job_group.id)).to be_valid
expect(Delayed::Job.new).to be_valid
end
Expand Down
Loading

0 comments on commit 380ba44

Please sign in to comment.