diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..c111b3313 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.gem diff --git a/HISTORY.txt b/HISTORY.txt new file mode 100644 index 000000000..68fc4176a --- /dev/null +++ b/HISTORY.txt @@ -0,0 +1,2 @@ +== 0.1.0 / 2008-11-28 + * First of many versions diff --git a/delayed_job.gemspec b/delayed_job.gemspec new file mode 100644 index 000000000..12892fb89 --- /dev/null +++ b/delayed_job.gemspec @@ -0,0 +1,39 @@ +Gem::Specification.new do |s| + s.name = "delayed_job" + s.version = "0.1.0" + s.date = "2008-11-28" + s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify" + s.email = "tobi@leetsoft.com" + s.homepage = "http://github.com/tobi/delayed_job/tree/master" + s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks." + s.authors = ["Tobias Lütke", "Justin Knowlden"] + + # s.bindir = "bin" + # s.executables = ["delayed_job"] + # s.default_executable = "delayed_job" + + s.has_rdoc = false + s.rdoc_options = ["--main", "README.textile"] + s.extra_rdoc_files = ["HISTORY.txt", "README.textile"] + + # run git ls-files to get an updated list + s.files = %w[ + HISTORY.txt + MIT-LICENSE + README.textile + delayed_job.gemspec + init.rb + lib/delayed/job.rb + lib/delayed/message_sending.rb + lib/delayed/performable_method.rb + lib/delayed/worker.rb + lib/delayed_job.rb + tasks/jobs.rake + ] + s.test_files = %w[ + spec/database.rb + spec/delayed_method_spec.rb + spec/job_spec.rb + spec/story_spec.rb + ] +end diff --git a/init.rb b/init.rb index 765138bef..a816d7ee8 100644 --- a/init.rb +++ b/init.rb @@ -1,5 +1 @@ -require File.dirname(__FILE__) + '/lib/delayed/message_sending' -require File.dirname(__FILE__) + '/lib/delayed/performable_method' -require File.dirname(__FILE__) + '/lib/delayed/job' - -Object.send(:include, Delayed::MessageSending) \ No newline at end of file +require File.dirname(__FILE__) + '/lib/delayed_job' diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index b2b5c0240..f6913dd53 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -15,8 +15,8 @@ class Job < ActiveRecord::Base cattr_accessor :destroy_failed_jobs self.destroy_failed_jobs = true - # Every worker has a unique name which by default is the pid of the process. - # There are some advantages to overriding this with something which survives worker retarts: + # Every worker has a unique name which by default is the pid of the process. + # There are some advantages to overriding this with something which survives worker retarts: # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before. cattr_accessor :worker_name self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}" @@ -25,10 +25,10 @@ class Job < ActiveRecord::Base NextTaskOrder = 'priority DESC, run_at ASC' ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ - + cattr_accessor :min_priority, :max_priority self.min_priority = nil - self.max_priority = nil + self.max_priority = nil class LockError < StandardError end @@ -45,8 +45,8 @@ def failed? def payload_object @payload_object ||= deserialize(self['handler']) end - - def name + + def name @name ||= begin payload = payload_object if payload.respond_to?(:display_name) @@ -80,13 +80,13 @@ def self.enqueue(*args, &block) if block_given? priority = args.first || 0 run_at = args.second - + Job.create(:payload_object => EvaledJob.new(&block), :priority => priority.to_i, :run_at => run_at) else object = args.first priority = args.second || 0 run_at = args.third - + unless object.respond_to?(:perform) raise ArgumentError, 'Cannot enqueue items which do not respond to perform' end @@ -96,32 +96,32 @@ def self.enqueue(*args, &block) end def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) - - time_now = db_time_now - + + time_now = db_time_now + sql = NextTaskSQL.dup conditions = [time_now, time_now - max_run_time, worker_name] - + if self.min_priority sql << ' AND (priority >= ?)' conditions << min_priority end - + if self.max_priority sql << ' AND (priority <= ?)' - conditions << max_priority + conditions << max_priority end - conditions.unshift(sql) - + conditions.unshift(sql) + records = ActiveRecord::Base.silence do find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit) end - + records.sort { rand() } - end - + end + # Get the payload of the next job we can get an exclusive lock on. # If no jobs are left we return nil def self.reserve(max_run_time = MAX_RUN_TIME, &block) @@ -142,7 +142,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME, &block) rescue LockError # We did not get the lock, some other worker process must have logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" - rescue StandardError => e + rescue StandardError => e job.reschedule e.message, e.backtrace log_exception(job, e) return job @@ -160,16 +160,16 @@ def lock_exclusively!(max_run_time, worker = worker_name) # We don't own this job so we will update the locked_by name and the locked_at self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) else - # We already own this job, this may happen if the job queue crashes. + # We already own this job, this may happen if the job queue crashes. # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 - + self.locked_at = now self.locked_by = worker end - + def unlock self.locked_at = nil self.locked_by = nil diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index b0888f781..de471ce84 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -2,19 +2,21 @@ module Delayed class Worker SLEEP = 5 + cattr_accessor :logger + self.logger = RAILS_DEFAULT_LOGGER if const_defined?(:RAILS_DEFAULT_LOGGER) + def initialize(options={}) - @quiet = options[:quiet] + @quiet = options[:quiet] Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority) Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority) - end + end def start say "*** Starting job worker #{Delayed::Job.worker_name}" trap('TERM') { say 'Exiting...'; $exit = true } trap('INT') { say 'Exiting...'; $exit = true } - - + loop do result = nil @@ -33,15 +35,15 @@ def start end break if $exit - end - + end + ensure Delayed::Job.clear_locks! end - + def say(text) puts text unless @quiet - RAILS_DEFAULT_LOGGER.info text + logger.info text if logger end end diff --git a/lib/delayed_job.rb b/lib/delayed_job.rb new file mode 100644 index 000000000..8affb189b --- /dev/null +++ b/lib/delayed_job.rb @@ -0,0 +1,6 @@ +require File.dirname(__FILE__) + '/delayed/message_sending' +require File.dirname(__FILE__) + '/delayed/performable_method' +require File.dirname(__FILE__) + '/delayed/job' +require File.dirname(__FILE__) + '/delayed/worker' + +Object.send(:include, Delayed::MessageSending) \ No newline at end of file