From 3a9ffeec24589669ee448ea9b2f243c2dd3567ad Mon Sep 17 00:00:00 2001 From: Chris Van Pelt Date: Sat, 10 Jan 2009 19:10:49 -0800 Subject: [PATCH] Refactor and version bump to facilitate both ActiveRecord and DataMapper as ORMs --- README.textile | 8 + Rakefile | 21 +++ delayed_job.gemspec | 4 +- lib/delayed/job.rb | 253 +---------------------------- lib/delayed/job/active_record.rb | 16 ++ lib/delayed/job/common.rb | 258 ++++++++++++++++++++++++++++++ lib/delayed/job/data_mapper.rb | 52 ++++++ lib/delayed/performable_method.rb | 13 +- lib/delayed_job.rb | 2 +- spec/database.rb | 79 +++++---- spec/delayed_method_spec.rb | 12 +- spec/job_spec.rb | 15 +- 12 files changed, 432 insertions(+), 301 deletions(-) create mode 100644 Rakefile create mode 100644 lib/delayed/job/active_record.rb create mode 100644 lib/delayed/job/common.rb create mode 100644 lib/delayed/job/data_mapper.rb diff --git a/README.textile b/README.textile index c09d62f54..2a7efa593 100644 --- a/README.textile +++ b/README.textile @@ -28,6 +28,8 @@ The library evolves around a delayed_jobs table which looks as follows: table.timestamps end +Delayed Job now supports both ActiveRecord and DataMapper as it's ORM. If DataMapper is defined when the gem is loaded, it will use it instead of ActiveRecord. To setup your table with DataMapper, require delayed_job in your application and run `Delayed::Job.auto_migrate!`. + h2. Usage Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. @@ -68,8 +70,14 @@ h3. Cleaning up You can invoke @rake jobs:clear@ to delete all jobs in the queue. +h3. Running specs + +Running rake spec will run all of the specs using ActiveRecord. To test the application using DataMapper run `rake spec DM=true`. + h3. Changes +* 1.8.0: Added a DataMapper adapter which is loaded instead of the ActiveRecord adapter if DataMapper is defined. + * 1.7.0: Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month. * 1.6.0: Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute. diff --git a/Rakefile b/Rakefile new file mode 100644 index 000000000..562222227 --- /dev/null +++ b/Rakefile @@ -0,0 +1,21 @@ +require 'spec/rake/spectask' +desc "Run specs, run a specific spec with TASK=spec/path_to_spec.rb. By default the tests are run with ActiveRecord, set DM=true to run the tests with DataMapper." +task :spec => [ "spec:default" ] + +namespace :spec do + OPTS_FILENAME = "./spec/spec.opts" + if File.exist?(OPTS_FILENAME) + SPEC_OPTS = ["--options", OPTS_FILENAME] + else + SPEC_OPTS = ["--color", "--format", "specdoc"] + end + + Spec::Rake::SpecTask.new('default') do |t| + t.spec_opts = SPEC_OPTS + if(ENV['TASK']) + t.spec_files = [ENV['TASK']] + else + t.spec_files = Dir['spec/*_spec.rb'].sort + end + end +end \ No newline at end of file diff --git a/delayed_job.gemspec b/delayed_job.gemspec index fd2fddf46..ea05a94d4 100644 --- a/delayed_job.gemspec +++ b/delayed_job.gemspec @@ -2,8 +2,8 @@ Gem::Specification.new do |s| s.name = "delayed_job" - s.version = "1.7.0" - s.date = "2008-11-28" + s.version = "1.8.0" + s.date = "2009-01-11" s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify" s.email = "tobi@leetsoft.com" s.homepage = "http://github.com/tobi/delayed_job/tree/master" diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index cf2e29b61..c2069ac1a 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -1,251 +1,2 @@ -module Delayed - - class DeserializationError < StandardError - end - - class Job < ActiveRecord::Base - MAX_ATTEMPTS = 25 - MAX_RUN_TIME = 4.hours - set_table_name :delayed_jobs - - # By default failed jobs are destroyed after too many attempts. - # If you want to keep them around (perhaps to inspect the reason - # for the failure), set this to false. - cattr_accessor :destroy_failed_jobs - self.destroy_failed_jobs = true - - # Every worker has a unique name which by default is the pid of the process. - # There are some advantages to overriding this with something which survives worker retarts: - # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before. - cattr_accessor :worker_name - self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}" - - NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL' - NextTaskOrder = 'priority DESC, run_at ASC' - - ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ - - cattr_accessor :min_priority, :max_priority - self.min_priority = nil - self.max_priority = nil - - class LockError < StandardError - end - - def self.clear_locks! - update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) - end - - def failed? - failed_at - end - alias_method :failed, :failed? - - def payload_object - @payload_object ||= deserialize(self['handler']) - end - - def name - @name ||= begin - payload = payload_object - if payload.respond_to?(:display_name) - payload.display_name - else - payload.class.name - end - end - end - - def payload_object=(object) - self['handler'] = object.to_yaml - end - - def reschedule(message, backtrace = [], time = nil) - if self.attempts < MAX_ATTEMPTS - time ||= Job.db_time_now + (attempts ** 4) + 5 - - self.attempts += 1 - self.run_at = time - self.last_error = message + "\n" + backtrace.join("\n") - self.unlock - save! - else - logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures." - destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now) - end - end - - def self.enqueue(*args, &block) - object = block_given? ? EvaledJob.new(&block) : args.shift - - unless object.respond_to?(:perform) || block_given? - raise ArgumentError, 'Cannot enqueue items which do not respond to perform' - end - - priority = args.first || 0 - run_at = args[1] - - Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at) - end - - def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) - - time_now = db_time_now - - sql = NextTaskSQL.dup - - conditions = [time_now, time_now - max_run_time, worker_name] - - if self.min_priority - sql << ' AND (priority >= ?)' - conditions << min_priority - end - - if self.max_priority - sql << ' AND (priority <= ?)' - conditions << max_priority - end - - conditions.unshift(sql) - - records = ActiveRecord::Base.silence do - find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit) - end - - records.sort_by { rand() } - end - - # Get the payload of the next job we can get an exclusive lock on. - # If no jobs are left we return nil - def self.reserve(max_run_time = MAX_RUN_TIME, &block) - - # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. - # this leads to a more even distribution of jobs across the worker processes - find_available(5, max_run_time).each do |job| - begin - logger.info "* [JOB] aquiring lock on #{job.name}" - job.lock_exclusively!(max_run_time, worker_name) - runtime = Benchmark.realtime do - invoke_job(job.payload_object, &block) - job.destroy - end - logger.info "* [JOB] #{job.name} completed after %.4f" % runtime - - return job - rescue LockError - # We did not get the lock, some other worker process must have - logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" - rescue StandardError => e - job.reschedule e.message, e.backtrace - log_exception(job, e) - return job - end - end - - nil - end - - # This method is used internally by reserve method to ensure exclusive access - # to the given job. It will rise a LockError if it cannot get this lock. - def lock_exclusively!(max_run_time, worker = worker_name) - now = self.class.db_time_now - affected_rows = if locked_by != worker - # We don't own this job so we will update the locked_by name and the locked_at - self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) - else - # We already own this job, this may happen if the job queue crashes. - # Simply resume and update the locked_at - self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) - end - raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 - - self.locked_at = now - self.locked_by = worker - end - - def unlock - self.locked_at = nil - self.locked_by = nil - end - - # This is a good hook if you need to report job processing errors in additional or different ways - def self.log_exception(job, error) - logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts" - logger.error(error) - end - - def self.work_off(num = 100) - success, failure = 0, 0 - - num.times do - job = self.reserve do |j| - begin - j.perform - success += 1 - rescue - failure += 1 - raise - end - end - - break if job.nil? - end - - return [success, failure] - end - - # Moved into its own method so that new_relic can trace it. - def self.invoke_job(job, &block) - block.call(job) - end - - private - - def deserialize(source) - handler = YAML.load(source) rescue nil - - unless handler.respond_to?(:perform) - if handler.nil? && source =~ ParseObjectFromYaml - handler_class = $1 - end - attempt_to_load(handler_class || handler.class) - handler = YAML.load(source) - end - - return handler if handler.respond_to?(:perform) - - raise DeserializationError, - 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' - rescue TypeError, LoadError, NameError => e - raise DeserializationError, - "Job failed to load: #{e.message}. Try to manually require the required file." - end - - # Constantize the object so that ActiveSupport can attempt - # its auto loading magic. Will raise LoadError if not successful. - def attempt_to_load(klass) - klass.constantize - end - - def self.db_time_now - (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now - end - - protected - - def before_save - self.run_at ||= self.class.db_time_now - end - - end - - class EvaledJob - def initialize - @job = yield - end - - def perform - eval(@job) - end - end -end +require File.dirname(__FILE__) + '/job/common' +require File.dirname(__FILE__) + (defined?(DataMapper) ? '/job/data_mapper' : '/job/active_record') \ No newline at end of file diff --git a/lib/delayed/job/active_record.rb b/lib/delayed/job/active_record.rb new file mode 100644 index 000000000..ae80838e9 --- /dev/null +++ b/lib/delayed/job/active_record.rb @@ -0,0 +1,16 @@ +module Delayed + class Job < ActiveRecord::Base + set_table_name :delayed_jobs + + include Common + protected + def before_save + self.run_at ||= self.class.db_time_now + end + end +end +#Place holder for type checking +module DataMapper + module Resource; end + module ObjectNotFoundError; end +end \ No newline at end of file diff --git a/lib/delayed/job/common.rb b/lib/delayed/job/common.rb new file mode 100644 index 000000000..0ac7b1768 --- /dev/null +++ b/lib/delayed/job/common.rb @@ -0,0 +1,258 @@ +module Delayed + + class DeserializationError < StandardError + end + + module Common + + def self.included(base) + base.const_set("MAX_RUN_TIME", 4.hours) unless base.const_defined?("MAX_RUN_TIME") + base.const_set("MAX_ATTEMPTS", 25) unless base.const_defined?("MAX_ATTEMTPS") + + base.const_set("NextTaskSQL", '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL') unless base.const_defined?("NextTaskSQL") + base.const_set("NextTaskOrder", base.respond_to?(:get) ? [:priority.desc, :run_at.asc] : 'priority DESC, run_at ASC') unless base.const_defined?("NextTaskOrder") + + base.const_set("ParseObjectFromYaml", /\!ruby\/\w+\:([^\s]+)/) unless base.const_defined?("ParseObjectFromYaml") + base.class_eval do + # By default failed jobs are destroyed after too many attempts. + # If you want to keep them around (perhaps to inspect the reason + # for the failure), set this to false. + cattr_accessor :destroy_failed_jobs + self.destroy_failed_jobs = true + + # Every worker has a unique name which by default is the pid of the process. + # There are some advantages to overriding this with something which survives worker retarts: + # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before. + cattr_accessor :worker_name + self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}" + cattr_accessor :min_priority, :max_priority + self.min_priority = nil + self.max_priority = nil + + include InstanceMethods + extend ClassMethods + end + end + + class LockError < StandardError + end + + module InstanceMethods + def failed? + failed_at + end + alias_method :failed, :failed? + + def payload_object + @payload_object ||= deserialize(self.handler) + end + + def name + @name ||= begin + payload = payload_object + if payload.respond_to?(:display_name) + payload.display_name + else + payload.class.name + end + end + end + + def payload_object=(object) + self.handler = object.to_yaml + end + + def reschedule(message, backtrace = [], time = nil) + if self.attempts < Delayed::Job::MAX_ATTEMPTS + time ||= Job.db_time_now + (attempts ** 4) + 5 + + self.attempts += 1 + self.run_at = time + self.last_error = message + "\n" + backtrace.join("\n") + self.unlock + save! + else + logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures." + destroy_failed_jobs ? destroy : update_attributes(:failed_at => Time.now) + end + end + + # This method is used internally by reserve method to ensure exclusive access + # to the given job. It will rise a LockError if it cannot get this lock. + def lock_exclusively!(max_run_time, worker = worker_name) + now = self.class.db_time_now + affected_rows = if locked_by != worker + # We don't own this job so we will update the locked_by name and the locked_at + self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) + else + # We already own this job, this may happen if the job queue crashes. + # Simply resume and update the locked_at + self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) + end + raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 + + self.locked_at = now + self.locked_by = worker + end + + def unlock + self.locked_at = nil + self.locked_by = nil + end + + private + + def deserialize(source) + handler = YAML.load(source) rescue nil + unless handler.respond_to?(:perform) + if handler.nil? && source =~ Delayed::Job::ParseObjectFromYaml + handler_class = $1 + end + attempt_to_load(handler_class || handler.class) + handler = YAML.load(source) + end + + return handler if handler.respond_to?(:perform) + + raise DeserializationError, + 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' + rescue TypeError, LoadError, NameError => e + raise DeserializationError, + "Job failed to load: #{e.message}. Try to manually require the required file." + end + + # Constantize the object so that ActiveSupport can attempt + # its auto loading magic. Will raise LoadError if not successful. + def attempt_to_load(klass) + klass.constantize + end + end + + module ClassMethods + def enqueue(*args, &block) + object = block_given? ? EvaledJob.new(&block) : args.shift + + unless object.respond_to?(:perform) || block_given? + raise ArgumentError, 'Cannot enqueue items which do not respond to perform' + end + + priority = args.first || 0 + run_at = args[1] + + Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at) + end + + def find_available(limit = 5, max_run_time = Delayed::Job::MAX_RUN_TIME) + + time_now = db_time_now + + sql = Delayed::Job::NextTaskSQL.dup + + conditions = [time_now, time_now - max_run_time, worker_name] + + if self.min_priority + sql << ' AND (priority >= ?)' + conditions << min_priority + end + + if self.max_priority + sql << ' AND (priority <= ?)' + conditions << max_priority + end + + conditions.unshift(sql) + + #DM vs. AR + if self.respond_to?(:find) + records = ActiveRecord::Base.silence do + find(:all, :conditions => conditions, :order => Delayed::Job::NextTaskOrder, :limit => limit) + end + else + orig, DataMapper.logger.level = DataMapper.logger.level, :error + records = all(:conditions => conditions, :order => Delayed::Job::NextTaskOrder, :limit => limit) + DataMapper.logger.level = orig + end + + records.sort_by { rand() } + end + + # Get the payload of the next job we can get an exclusive lock on. + # If no jobs are left we return nil + def reserve(max_run_time = Delayed::Job::MAX_RUN_TIME, &block) + # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. + # this leads to a more even distribution of jobs across the worker processes + find_available(5, max_run_time).each do |job| + begin + logger.info "* [JOB] aquiring lock on #{job.name}" + job.lock_exclusively!(max_run_time, worker_name) + runtime = Benchmark.realtime do + invoke_job(job.payload_object, &block) + job.destroy + end + logger.info "* [JOB] #{job.name} completed after %.4f" % runtime + + return job + rescue LockError + # We did not get the lock, some other worker process must have + logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" + rescue StandardError => e + job.reschedule e.message, e.backtrace + log_exception(job, e) + return job + end + end + + nil + end + + def clear_locks! + update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) + end + + # This is a good hook if you need to report job processing errors in additional or different ways + def log_exception(job, error) + logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts" + logger.error(error) + end + + def work_off(num = 100) + success, failure = 0, 0 + + num.times do + job = self.reserve do |j| + begin + j.perform + success += 1 + rescue + failure += 1 + raise + end + end + + break if job.nil? + end + + return [success, failure] + end + + # Moved into its own method so that new_relic can trace it. + def invoke_job(job, &block) + block.call(job) + end + + def db_time_now + (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now + end + end + end + + class EvaledJob + def initialize + @job = yield + end + + def perform + eval(@job) + end + end +end diff --git a/lib/delayed/job/data_mapper.rb b/lib/delayed/job/data_mapper.rb new file mode 100644 index 000000000..a555e4218 --- /dev/null +++ b/lib/delayed/job/data_mapper.rb @@ -0,0 +1,52 @@ +module Delayed + class Job + include DataMapper::Resource + + storage_names[:default]='delayed_jobs' + + property :id, Serial + property :priority, Integer, :default => 0 + property :attempts, Integer, :default => 0 + property :handler, Text + property :last_error, String + property :run_at, Time + property :locked_at, Time + property :locked_by, String + property :failed_at, Time + timestamps(:at) + + def self.update_all(with, from) + repository(:default).adapter.execute("UPDATE #{storage_names[:default]} SET #{with[0]} WHERE #{from[0]}", *with[1..-1].concat(from[1..-1])).affected_rows + end + + def self.delete_all + all.destroy! + end + + def self.last + all.last + end + + def logger + DataMapper.logger + end + + def self.logger + DataMapper.logger + end + + before(:save) { self.run_at ||= self.class.db_time_now } + + include Common + end +end + +#Place holder for type checking +module ActiveRecord + module Base + def self.default_timezone + :boo + end + end + module RecordNotFound; end +end \ No newline at end of file diff --git a/lib/delayed/performable_method.rb b/lib/delayed/performable_method.rb index 18bc77ac5..5182e7603 100644 --- a/lib/delayed/performable_method.rb +++ b/lib/delayed/performable_method.rb @@ -2,6 +2,7 @@ module Delayed class PerformableMethod < Struct.new(:object, :method, :args) CLASS_STRING_FORMAT = /^CLASS\:([A-Z][\w\:]+)$/ AR_STRING_FORMAT = /^AR\:([A-Z][\w\:]+)\:(\d+)$/ + DM_STRING_FORMAT = /^DM\:([A-Z][\w\:]+)\:(\d+)$/ def initialize(object, method, args) raise NoMethodError, "undefined method `#{method}' for #{self.inspect}" unless object.respond_to?(method) @@ -14,14 +15,14 @@ def initialize(object, method, args) def display_name case self.object when CLASS_STRING_FORMAT then "#{$1}.#{method}" - when AR_STRING_FORMAT then "#{$1}##{method}" + when AR_STRING_FORMAT, DM_STRING_FORMAT then "#{$1}##{method}" else "Unknown##{method}" end end def perform load(object).send(method, *args.map{|a| load(a)}) - rescue ActiveRecord::RecordNotFound + rescue ActiveRecord::RecordNotFound, DataMapper::ObjectNotFoundError # We cannot do anything about objects which were deleted in the meantime true end @@ -31,7 +32,8 @@ def perform def load(arg) case arg when CLASS_STRING_FORMAT then $1.constantize - when AR_STRING_FORMAT then $1.constantize.find($2) + when AR_STRING_FORMAT then $1.constantize.find($2) + when DM_STRING_FORMAT then $1.constantize.get($2) else arg end end @@ -40,6 +42,7 @@ def dump(arg) case arg when Class then class_to_string(arg) when ActiveRecord::Base then ar_to_string(arg) + when DataMapper::Resource then dm_to_string(arg) else arg end end @@ -47,6 +50,10 @@ def dump(arg) def ar_to_string(obj) "AR:#{obj.class}:#{obj.id}" end + + def dm_to_string(obj) + "DM:#{obj.class}:#{obj.id}" + end def class_to_string(obj) "CLASS:#{obj.name}" diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb index 9c9206cb0..2170d767a 100644 --- a/lib/delayed_job.rb +++ b/lib/delayed_job.rb @@ -1,4 +1,4 @@ -autoload :ActiveRecord, 'activerecord' +autoload :ActiveRecord, 'activerecord' if !defined?(DataMapper) require File.dirname(__FILE__) + '/delayed/message_sending' require File.dirname(__FILE__) + '/delayed/performable_method' diff --git a/spec/database.rb b/spec/database.rb index 20ae53da5..066738691 100644 --- a/spec/database.rb +++ b/spec/database.rb @@ -2,38 +2,55 @@ $:.unshift(File.dirname(__FILE__) + '/../../rspec/lib') require 'rubygems' -require 'active_record' -gem 'sqlite3-ruby' - -require File.dirname(__FILE__) + '/../init' +require 'active_support' require 'spec' - -ActiveRecord::Base.logger = Logger.new('/tmp/dj.log') -ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => '/tmp/jobs.sqlite') -ActiveRecord::Migration.verbose = false - -ActiveRecord::Schema.define do - create_table :delayed_jobs, :force => true do |table| - table.integer :priority, :default => 0 - table.integer :attempts, :default => 0 - table.text :handler - table.string :last_error - table.datetime :run_at - table.datetime :locked_at - table.string :locked_by - table.datetime :failed_at - table.timestamps +#ENV['DM'] = 'true' +if ENV['DM'] + puts "Running tests with DataMapper as the ORM." + require 'datamapper' + require File.dirname(__FILE__) + '/../init' + DataMapper.logger = Logger.new('/tmp/dj.log') + DataMapper.setup(:default, 'sqlite3::memory:') + class Story + include DataMapper::Resource + property :id, Serial + property :text, Text + + def tell; text; end end - - create_table :stories, :force => true do |table| - table.string :text + DataMapper.auto_migrate! +else + puts "Running tests with ActiveRecord as the ORM." + require 'active_record' + require File.dirname(__FILE__) + '/../init' + gem 'sqlite3-ruby' + ActiveRecord::Base.logger = Logger.new('/tmp/dj.log') + ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => '/tmp/jobs.sqlite') + ActiveRecord::Migration.verbose = false + + ActiveRecord::Schema.define do + + create_table :delayed_jobs, :force => true do |table| + table.integer :priority, :default => 0 + table.integer :attempts, :default => 0 + table.text :handler + table.string :last_error + table.datetime :run_at + table.datetime :locked_at + table.string :locked_by + table.datetime :failed_at + table.timestamps + end + + create_table :stories, :force => true do |table| + table.string :text + end + end - -end - - -# Purely useful for test cases... -class Story < ActiveRecord::Base - def tell; text; end -end + + # Purely useful for test cases... + class Story < ActiveRecord::Base + def tell; text; end + end +end \ No newline at end of file diff --git a/spec/delayed_method_spec.rb b/spec/delayed_method_spec.rb index 9d089ffa9..970228645 100644 --- a/spec/delayed_method_spec.rb +++ b/spec/delayed_method_spec.rb @@ -14,7 +14,8 @@ def say_hello class ErrorObject def throw - raise ActiveRecord::RecordNotFound, '...' + error = ENV['DM'] ? DataMapper::ObjectNotFoundError : ActiveRecord::RecordNotFound + raise error, '...' false end @@ -81,6 +82,7 @@ def read(story) output = nil Delayed::Job.reserve do |e| + puts e.inspect output = e.perform end @@ -92,9 +94,9 @@ def read(story) story = Story.create :text => 'Once upon...' story.send_later(:tell) - job = Delayed::Job.find(:first) + job = Delayed::Job.first job.payload_object.class.should == Delayed::PerformableMethod - job.payload_object.object.should == "AR:Story:#{story.id}" + job.payload_object.object.should == "#{ENV['DM'] ? "DM" : "AR"}:Story:#{story.id}" job.payload_object.method.should == :tell job.payload_object.args.should == [] job.payload_object.perform.should == 'Once upon...' @@ -107,10 +109,10 @@ def read(story) reader = StoryReader.new reader.send_later(:read, story) - job = Delayed::Job.find(:first) + job = Delayed::Job.first job.payload_object.class.should == Delayed::PerformableMethod job.payload_object.method.should == :read - job.payload_object.args.should == ["AR:Story:#{story.id}"] + job.payload_object.args.should == ["#{ENV['DM'] ? "DM" : "AR"}:Story:#{story.id}"] job.payload_object.perform.should == 'Epilog: Once upon...' end diff --git a/spec/job_spec.rb b/spec/job_spec.rb index 7d7a735dc..9739b081c 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -58,7 +58,7 @@ def perform; @@runs += 1; end Delayed::Job.enqueue SimpleJob.new, 5, later # use be close rather than equal to because millisecond values cn be lost in DB round trip - Delayed::Job.first.run_at.should be_close(later, 1) + Delayed::Job.first.run_at.to_time.should be_close(later.to_time, 1) end it "should call perform on jobs when running work_off" do @@ -97,7 +97,7 @@ def perform; @@runs += 1; end Delayed::Job.enqueue ErrorJob.new Delayed::Job.work_off(1) - job = Delayed::Job.find(:first) + job = Delayed::Job.first job.last_error.should =~ /did not work/ job.last_error.should =~ /job_spec.rb:10:in `perform'/ @@ -110,14 +110,14 @@ def perform; @@runs += 1; end it "should raise an DeserializationError when the job class is totally unknown" do job = Delayed::Job.new - job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}" + job.handler = "--- !ruby/object:JobThatDoesNotExist {}" lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError) end it "should try to load the class when it is unknown at the time of the deserialization" do job = Delayed::Job.new - job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}" + job.handler = "--- !ruby/object:JobThatDoesNotExist {}" job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true) @@ -126,14 +126,14 @@ def perform; @@runs += 1; end it "should try include the namespace when loading unknown objects" do job = Delayed::Job.new - job['handler'] = "--- !ruby/object:Delayed::JobThatDoesNotExist {}" + job.handler = "--- !ruby/object:Delayed::JobThatDoesNotExist {}" job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true) lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError) end it "should also try to load structs when they are unknown (raises TypeError)" do job = Delayed::Job.new - job['handler'] = "--- !ruby/struct:JobThatDoesNotExist {}" + job.handler = "--- !ruby/struct:JobThatDoesNotExist {}" job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true) @@ -142,7 +142,7 @@ def perform; @@runs += 1; end it "should try include the namespace when loading unknown structs" do job = Delayed::Job.new - job['handler'] = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}" + job.handler = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}" job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true) lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError) @@ -194,7 +194,6 @@ def perform; @@runs += 1; end it "should be able to get access to the task if it was started more then max_age ago" do @job.locked_at = 5.hours.ago @job.save - @job.lock_exclusively! 4.hours, 'worker2' @job.reload @job.locked_by.should == 'worker2'