diff --git a/.circleci/config.yml b/.circleci/config.yml index 0c2b767..15724b9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,14 +2,14 @@ version: 2.1 jobs: lint: docker: - - image: salsify/ruby_ci:2.7.7 + - image: cimg/ruby:3.0.6 working_directory: ~/delayed_job_groups steps: - checkout - restore_cache: keys: - - v1-gems-ruby-2.7.7-{{ checksum "delayed_job_groups.gemspec" }}-{{ checksum "Gemfile" }} - - v1-gems-ruby-2.7.7- + - v1-gems-ruby-3.0.6-{{ checksum "delayed_job_groups.gemspec" }}-{{ checksum "Gemfile" }} + - v1-gems-ruby-3.0.6- - run: name: Install Gems command: | @@ -18,7 +18,7 @@ jobs: bundle clean fi - save_cache: - key: v1-gems-ruby-2.7.7-{{ checksum "delayed_job_groups.gemspec" }}-{{ checksum "Gemfile" }} + key: v1-gems-ruby-3.0.6-{{ checksum "delayed_job_groups.gemspec" }}-{{ checksum "Gemfile" }} paths: - "vendor/bundle" - "gemfiles/vendor/bundle" @@ -32,7 +32,7 @@ jobs: ruby_version: type: string docker: - - image: salsify/ruby_ci:<< parameters.ruby_version >> + - image: cimg/ruby:<< parameters.ruby_version >> environment: CIRCLE_TEST_REPORTS: "test-results" BUNDLE_GEMFILE: << parameters.gemfile >> @@ -69,11 +69,9 @@ workflows: matrix: parameters: gemfile: - - "gemfiles/rails_6.0.gemfile" - "gemfiles/rails_6.1.gemfile" - "gemfiles/rails_7.0.gemfile" ruby_version: - - "2.7.7" - - "3.0.5" - - "3.1.3" - - "3.2.0" + - "3.0.6" + - "3.1.4" + - "3.2.2" diff --git a/.gitignore b/.gitignore index 8a2dd4f..b60e537 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ test/tmp test/version_tmp tmp *.gemfile.lock +.ruby-version diff --git a/.rubocop.yml b/.rubocop.yml index ea5950a..2ae6114 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -2,7 +2,7 @@ inherit_gem: salsify_rubocop: conf/rubocop.yml AllCops: - TargetRubyVersion: 2.7 + TargetRubyVersion: 3.0 Exclude: - 'vendor/**/*' - 'gemfiles/**/*' diff --git a/Appraisals b/Appraisals index 9e9ff7f..08e4131 100644 --- a/Appraisals +++ b/Appraisals @@ -1,10 +1,5 @@ # frozen_string_literal: true -appraise 'rails-6.0' do - gem 'activerecord', '~> 6.0.4' - gem 'activesupport', '~> 6.0.4' -end - appraise 'rails-6.1' do gem 'activerecord', '~> 6.1.5' gem 'activesupport', '~> 6.1.5' diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ac04ea..8f8f620 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +### 0.9.0 +- Add a `CompleteStuckJobGroupsJob`, which can be run periodically to close "stuck" job groups +- Drop support for Ruby 2.7 +- Drop support for Rails 6.0 + ### 0.8.0 - Drop support for ruby < 2.7 - Add support for ruby 3.1 diff --git a/README.md b/README.md index 7fdd6f7..64e7e25 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ Creating a job group and queueing some jobs: ```ruby job_group = Delayed::JobGroups::JobGroup.create! -# JobGroup#enqueue has the same signature as Delayed::Job.enqueue +# JobGroup#enqueue has the same signature as Delayed::Job.enqueue # i.e. it takes a job and an optional hash of options. job_group.enqueue(MyJob.new('some arg'), queue: 'general') job_group.enqueue(MyJob.new('some other arg'), queue: 'general', priority: 10) @@ -62,7 +62,7 @@ Registering a job to run after all jobs in the job group have completed: ```ruby # We can optionally pass options that will be used when queueing the on completion job -job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: MyCompletionJob.new, +job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: MyCompletionJob.new, on_completion_job_options: { queue: 'general' }) ``` @@ -70,7 +70,7 @@ Registering a job to run if the job group is canceled or fails: ```ruby # We can optionally pass options that will be used when queueing the on cancellation job -job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: MyCancellationJob.new, +job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: MyCancellationJob.new, on_cancellation_job_options: { queue: 'general' }) ``` @@ -81,9 +81,9 @@ Block and unblock jobs in a job group: job_group = Delayed::JobGroups::JobGroup.create!(blocked: true) job_group.enqueue(MyJob.new('some arg'), queue: 'general') job_group.mark_queueing_complete - + # Do more stuff... - + # Unblock the JobGroup so its jobs can run job_group.unblock ``` @@ -92,19 +92,32 @@ Cancel a job group: ```ruby job_group = Delayed::JobGroups::JobGroup.create! - + # Do more stuff... - + job_group.cancel ``` Configuration to allow failed jobs not to cancel the group ```ruby # We can optionally pass options that will allow jobs to fail without cancelling the group. -# This also allows the on_completion job to fire once all jobs have either succeeded or failed. +# This also allows the on_completion job to fire once all jobs have either succeeded or failed. job_group = Delayed::JobGroups::JobGroup.create!(failure_cancels_group: false) ``` +### Maintenance + +It's possible to end up in a state where all jobs in a group have been completed, but the completion job has not run. +This is due a race condition where the final job in a group is completed, and the worker running it is terminated before +the completion job can be enqueued. + +As a remedy for the above scenario, a job is provided which cleans up any job groups in this state. It is recommended to +run this job periodically (for example in a cron job), especially in high-thoughput applications. + +```ruby +Delayed::JobGroups::CompleteStuckJobGroupsJob.enqueue +``` + ## Supported Platforms * Only the Delayed Job Active Record backend is supported. diff --git a/bin/console b/bin/console new file mode 100755 index 0000000..38e453a --- /dev/null +++ b/bin/console @@ -0,0 +1,15 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require 'bundler/setup' +require 'delayed_job_groups_plugin' + +# You can add fixtures and/or initialization code here to make experimenting +# with your gem easier. You can also use a different console, if you like. + +# (If you use this, don't forget to add pry to your Gemfile!) +# require "pry" +# Pry.start + +require 'irb' +IRB.start diff --git a/delayed_job_groups.gemspec b/delayed_job_groups.gemspec index 995c109..ac6d28a 100644 --- a/delayed_job_groups.gemspec +++ b/delayed_job_groups.gemspec @@ -25,7 +25,7 @@ Gem::Specification.new do |spec| spec.test_files = Dir.glob('spec/**/*') spec.require_paths = ['lib'] - spec.required_ruby_version = '>= 2.7' + spec.required_ruby_version = '>= 3.0' spec.add_dependency 'delayed_job', '>= 4.1' spec.add_dependency 'delayed_job_active_record', '>= 4.1' @@ -34,9 +34,10 @@ Gem::Specification.new do |spec| 'for upgrade/installation notes.' spec.add_development_dependency 'appraisal' - spec.add_dependency 'activerecord', '>= 5.2', '< 7.1' + spec.add_dependency 'activerecord', '>= 6.1', '< 7.1' spec.add_development_dependency 'coveralls_reborn', '>= 0.18.0' spec.add_development_dependency 'database_cleaner', '>= 1.2' + spec.add_development_dependency 'factory_bot_rails' spec.add_development_dependency 'mime-types' spec.add_development_dependency 'rake' spec.add_development_dependency 'rspec', '~> 3' diff --git a/gemfiles/rails_6.0.gemfile b/gemfiles/rails_6.0.gemfile deleted file mode 100644 index 9b80006..0000000 --- a/gemfiles/rails_6.0.gemfile +++ /dev/null @@ -1,8 +0,0 @@ -# This file was generated by Appraisal - -source "https://rubygems.org" - -gem "activerecord", "~> 6.0.4" -gem "activesupport", "~> 6.0.4" - -gemspec path: "../" diff --git a/lib/delayed/job_groups/complete_stuck_job_groups_job.rb b/lib/delayed/job_groups/complete_stuck_job_groups_job.rb new file mode 100644 index 0000000..fa15ae1 --- /dev/null +++ b/lib/delayed/job_groups/complete_stuck_job_groups_job.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Delayed + module JobGroups + class CompleteStuckJobGroupsJob + class << self + def enqueue(**kwargs) + Delayed::Job.enqueue(new, **kwargs) + end + end + + def perform + Delayed::JobGroups::JobGroup.ready.with_no_open_jobs.find_each do |job_group| + job_group.check_for_completion(skip_pending_jobs_check: true) + end + end + end + end +end diff --git a/lib/delayed/job_groups/job_group.rb b/lib/delayed/job_groups/job_group.rb index d1bdb1c..46fa029 100644 --- a/lib/delayed/job_groups/job_group.rb +++ b/lib/delayed/job_groups/job_group.rb @@ -22,6 +22,11 @@ class JobGroup < ActiveRecord::Base has_many :queued_jobs, -> { where(failed_at: nil, locked_by: nil) }, class_name: '::Delayed::Job', dependent: :delete_all + scope :ready, -> { where(queueing_complete: true, blocked: false) } + scope :with_no_open_jobs, -> do + where("NOT EXISTS (#{Delayed::Job.where('delayed_jobs.job_group_id = delayed_job_groups.id').to_sql})") + end + def mark_queueing_complete with_lock do raise 'JobGroup has already completed queueing' if queueing_complete? @@ -52,10 +57,14 @@ def cancel destroy end - def self.check_for_completion(job_group_id) + def check_for_completion(skip_pending_jobs_check: false) + self.class.check_for_completion(id, skip_pending_jobs_check: skip_pending_jobs_check) + end + + def self.check_for_completion(job_group_id, skip_pending_jobs_check: false) # Optimization to avoid loading and locking the JobGroup when the group # still has pending jobs - return if has_pending_jobs?(job_group_id) + return if !skip_pending_jobs_check && has_pending_jobs?(job_group_id) transaction do # The first completed job to notice the job group's queue count has dropped to diff --git a/lib/delayed/job_groups/version.rb b/lib/delayed/job_groups/version.rb index 6df91e4..d277d15 100644 --- a/lib/delayed/job_groups/version.rb +++ b/lib/delayed/job_groups/version.rb @@ -2,6 +2,6 @@ module Delayed module JobGroups - VERSION = '0.8.0' + VERSION = '0.9.0' end end diff --git a/lib/delayed_job_groups_plugin.rb b/lib/delayed_job_groups_plugin.rb index acbaa26..5179ed7 100644 --- a/lib/delayed_job_groups_plugin.rb +++ b/lib/delayed_job_groups_plugin.rb @@ -5,6 +5,7 @@ require 'delayed_job' require 'delayed_job_active_record' require 'delayed/job_groups/compatibility' +require 'delayed/job_groups/complete_stuck_job_groups_job' require 'delayed/job_groups/job_extensions' require 'delayed/job_groups/job_group' require 'delayed/job_groups/plugin' diff --git a/spec/delayed/job_groups/complete_stuck_job_groups_job_spec.rb b/spec/delayed/job_groups/complete_stuck_job_groups_job_spec.rb new file mode 100644 index 0000000..85e56de --- /dev/null +++ b/spec/delayed/job_groups/complete_stuck_job_groups_job_spec.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +describe Delayed::JobGroups::CompleteStuckJobGroupsJob do + describe "#perform" do + let(:job) { described_class.new } + + let!(:blocked) { create(:job_group, blocked: true) } + let!(:not_queueing_complete) { create(:job_group, queueing_complete: false) } + let!(:ready_without_jobs) { create(:job_group, queueing_complete: true, blocked: false) } + let!(:ready_with_jobs) do + create(:job_group, queueing_complete: true, blocked: false).tap do |job_group| + create(:delayed_job, job_group: job_group) + end + end + + before do + allow(Delayed::JobGroups::JobGroup).to receive(:check_for_completion) + end + + it "checks all relevant job groups for completion" do + job.perform + + expect(Delayed::JobGroups::JobGroup).to have_received(:check_for_completion) + .once + .with(ready_without_jobs.id, skip_pending_jobs_check: true) + end + end + + describe "#enqueue" do + before do + allow(Delayed::Job).to receive(:enqueue) + end + + it "enqueues the job" do + described_class.enqueue + + expect(Delayed::Job).to have_received(:enqueue).with(instance_of(described_class)) + end + end +end diff --git a/spec/delayed/job_groups/job_extensions_spec.rb b/spec/delayed/job_groups/job_extensions_spec.rb index 3af3e14..8ceab81 100644 --- a/spec/delayed/job_groups/job_extensions_spec.rb +++ b/spec/delayed/job_groups/job_extensions_spec.rb @@ -2,7 +2,7 @@ describe "delayed job extensions" do it "provides an optional job_group_id" do - job_group = Delayed::JobGroups::JobGroup.create! + job_group = create(:job_group) expect(Delayed::Job.new(job_group_id: job_group.id)).to be_valid expect(Delayed::Job.new).to be_valid end diff --git a/spec/delayed/job_groups/job_group_spec.rb b/spec/delayed/job_groups/job_group_spec.rb index e142954..92da6b3 100644 --- a/spec/delayed/job_groups/job_group_spec.rb +++ b/spec/delayed/job_groups/job_group_spec.rb @@ -10,9 +10,12 @@ let(:current_time) { Time.utc(2013) } subject(:job_group) do - Delayed::JobGroups::JobGroup.create!(on_completion_job: on_completion_job, - on_completion_job_options: on_completion_job_options, - blocked: blocked) + create( + :job_group, + on_completion_job: on_completion_job, + on_completion_job_options: on_completion_job_options, + blocked: blocked + ) end before do @@ -24,6 +27,27 @@ Timecop.return end + describe "scopes" do + describe "ready" do + let!(:blocked) { create(:job_group, blocked: true) } + let!(:not_queueing_complete) { create(:job_group, queueing_complete: false) } + let!(:ready) { create(:job_group, queueing_complete: true, blocked: false) } + + it "returns the expected job groups" do + expect(described_class.ready).to match_array(ready) + end + end + + describe "with_no_open_jobs" do + let!(:job_group_with_jobs) { create(:delayed_job).job_group } + let!(:job_group_without_jobs) { subject } + + it "returns groups with no jobs" do + expect(described_class.with_no_open_jobs).to match_array(job_group_without_jobs) + end + end + end + shared_examples "the job group was completed" do it "queues the completion job" do expect(Delayed::Job).to have_received(:enqueue).with(on_completion_job, on_completion_job_options) @@ -76,7 +100,73 @@ end end - describe ".check_for_completion" do + describe "instance check_for_completion" do + let!(:job) { Delayed::Job.create!(job_group_id: job_group.id) } + + before do + job_group.mark_queueing_complete + end + + shared_context "complete job and check job group complete" do + before do + job.destroy! + job_group.check_for_completion + end + end + + context "when no jobs exist" do + include_context "complete job and check job group complete" + + it_behaves_like "the job group was completed" + end + + context "when active jobs exist" do + before do + Delayed::JobGroups::JobGroup.check_for_completion(job_group.id) + end + + it_behaves_like "the job group was not completed" + end + + context "when on failed jobs exist" do + before do + job.update!(failed_at: Time.now) + Delayed::JobGroups::JobGroup.check_for_completion(job_group.id) + end + + it_behaves_like "the job group was completed" + end + + context "when there are no on_completion_job_options" do + let(:on_completion_job_options) { nil } + + include_context "complete job and check job group complete" + + it "queues the completion job with empty options" do + expect(Delayed::Job).to have_received(:enqueue).with(on_completion_job, {}) + end + + it "destroys the job group" do + expect(job_group).to have_been_destroyed + end + end + + context "when there is no on_completion_job" do + let(:on_completion_job) { nil } + + include_context "complete job and check job group complete" + + it "doesn't queues the non-existent completion job" do + expect(Delayed::Job).not_to have_received(:enqueue) + end + + it "destroys the job group" do + expect(job_group).to have_been_destroyed + end + end + end + + describe "class check_for_completion" do let!(:job) { Delayed::Job.create!(job_group_id: job_group.id) } before do @@ -85,7 +175,7 @@ shared_context "complete job and check job group complete" do before do - job.destroy + job.destroy! Delayed::JobGroups::JobGroup.check_for_completion(job_group.id) end end diff --git a/spec/delayed/job_groups/plugin_spec.rb b/spec/delayed/job_groups/plugin_spec.rb index 6418ce7..0bc0c12 100644 --- a/spec/delayed/job_groups/plugin_spec.rb +++ b/spec/delayed/job_groups/plugin_spec.rb @@ -13,7 +13,7 @@ Delayed::Worker.max_attempts = @old_max_attempts end - let!(:job_group) { Delayed::JobGroups::JobGroup.create!(on_completion_job: TestJobs::CompletionJob.new) } + let!(:job_group) { create(:job_group, on_completion_job: TestJobs::CompletionJob.new) } it "runs the completion job after completing other jobs" do job_group.enqueue(TestJobs::NoOpJob.new) @@ -215,8 +215,11 @@ context "when a cancellation job is provided" do let!(:job_group) do - Delayed::JobGroups::JobGroup.create!(on_completion_job: TestJobs::CompletionJob.new, - on_cancellation_job: TestJobs::CancellationJob.new) + create( + :job_group, + on_completion_job: TestJobs::CompletionJob.new, + on_cancellation_job: TestJobs::CancellationJob.new + ) end it "runs the cancellation job after a job error causes cancellation" do @@ -262,7 +265,7 @@ end context "when a no completion job is provided" do - let!(:job_group) { Delayed::JobGroups::JobGroup.create! } + let!(:job_group) { create(:job_group) } it "doesn't queue a non-existent completion job" do job_group.enqueue(TestJobs::NoOpJob.new) diff --git a/spec/factories/delayed_jobs.rb b/spec/factories/delayed_jobs.rb new file mode 100644 index 0000000..3f71658 --- /dev/null +++ b/spec/factories/delayed_jobs.rb @@ -0,0 +1,7 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :delayed_job, class: 'Delayed::Job' do + job_group { create(:job_group) } + end +end diff --git a/spec/factories/job_groups.rb b/spec/factories/job_groups.rb new file mode 100644 index 0000000..c8972ba --- /dev/null +++ b/spec/factories/job_groups.rb @@ -0,0 +1,10 @@ +# frozen_string_literal: true + +FactoryBot.define do + factory :job_group, class: 'Delayed::JobGroups::JobGroup' do + blocked { false } + queueing_complete { false } + on_completion_job { nil } + on_completion_job_options { nil } + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 6ff298c..16e4f2d 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -14,6 +14,7 @@ require 'rspec/its' require 'database_cleaner' require 'delayed_job_groups_plugin' +require 'factory_bot' require 'yaml' require 'timecop' @@ -40,6 +41,7 @@ config.before(:suite) do DatabaseCleaner.clean_with(:truncation) + FactoryBot.find_definitions end config.before do @@ -53,4 +55,6 @@ config.after do DatabaseCleaner.clean end + + config.include FactoryBot::Syntax::Methods end