Skip to content

Commit

Permalink
Refactor and version bump to facilitate both ActiveRecord and DataMap…
Browse files Browse the repository at this point in the history
…per as ORMs
  • Loading branch information
vanpelt committed Jan 11, 2009
1 parent cf4ad9f commit 3a9ffee
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 301 deletions.
8 changes: 8 additions & 0 deletions README.textile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ The library evolves around a delayed_jobs table which looks as follows:
table.timestamps
end

Delayed Job now supports both ActiveRecord and DataMapper as it's ORM. If DataMapper is defined when the gem is loaded, it will use it instead of ActiveRecord. To setup your table with DataMapper, require delayed_job in your application and run `Delayed::Job.auto_migrate!`.

h2. Usage

Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table.
Expand Down Expand Up @@ -68,8 +70,14 @@ h3. Cleaning up

You can invoke @rake jobs:clear@ to delete all jobs in the queue.

h3. Running specs

Running rake spec will run all of the specs using ActiveRecord. To test the application using DataMapper run `rake spec DM=true`.

h3. Changes

* 1.8.0: Added a DataMapper adapter which is loaded instead of the ActiveRecord adapter if DataMapper is defined.

* 1.7.0: Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month.

* 1.6.0: Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute.
Expand Down
21 changes: 21 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
require 'spec/rake/spectask'
desc "Run specs, run a specific spec with TASK=spec/path_to_spec.rb. By default the tests are run with ActiveRecord, set DM=true to run the tests with DataMapper."
task :spec => [ "spec:default" ]

namespace :spec do
OPTS_FILENAME = "./spec/spec.opts"
if File.exist?(OPTS_FILENAME)
SPEC_OPTS = ["--options", OPTS_FILENAME]
else
SPEC_OPTS = ["--color", "--format", "specdoc"]
end

Spec::Rake::SpecTask.new('default') do |t|
t.spec_opts = SPEC_OPTS
if(ENV['TASK'])
t.spec_files = [ENV['TASK']]
else
t.spec_files = Dir['spec/*_spec.rb'].sort
end
end
end
4 changes: 2 additions & 2 deletions delayed_job.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

Gem::Specification.new do |s|
s.name = "delayed_job"
s.version = "1.7.0"
s.date = "2008-11-28"
s.version = "1.8.0"
s.date = "2009-01-11"
s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify"
s.email = "[email protected]"
s.homepage = "http://github.com/tobi/delayed_job/tree/master"
Expand Down
253 changes: 2 additions & 251 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
@@ -1,251 +1,2 @@
module Delayed

class DeserializationError < StandardError
end

class Job < ActiveRecord::Base
MAX_ATTEMPTS = 25
MAX_RUN_TIME = 4.hours
set_table_name :delayed_jobs

# By default failed jobs are destroyed after too many attempts.
# If you want to keep them around (perhaps to inspect the reason
# for the failure), set this to false.
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"

NextTaskSQL = '(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR (locked_by = ?)) AND failed_at IS NULL'
NextTaskOrder = 'priority DESC, run_at ASC'

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

cattr_accessor :min_priority, :max_priority
self.min_priority = nil
self.max_priority = nil

class LockError < StandardError
end

def self.clear_locks!
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

def failed?
failed_at
end
alias_method :failed, :failed?

def payload_object
@payload_object ||= deserialize(self['handler'])
end

def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
payload.display_name
else
payload.class.name
end
end
end

def payload_object=(object)
self['handler'] = object.to_yaml
end

def reschedule(message, backtrace = [], time = nil)
if self.attempts < MAX_ATTEMPTS
time ||= Job.db_time_now + (attempts ** 4) + 5

self.attempts += 1
self.run_at = time
self.last_error = message + "\n" + backtrace.join("\n")
self.unlock
save!
else
logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
destroy_failed_jobs ? destroy : update_attribute(:failed_at, Time.now)
end
end

def self.enqueue(*args, &block)
object = block_given? ? EvaledJob.new(&block) : args.shift

unless object.respond_to?(:perform) || block_given?
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end

priority = args.first || 0
run_at = args[1]

Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end

def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)

time_now = db_time_now

sql = NextTaskSQL.dup

conditions = [time_now, time_now - max_run_time, worker_name]

if self.min_priority
sql << ' AND (priority >= ?)'
conditions << min_priority
end

if self.max_priority
sql << ' AND (priority <= ?)'
conditions << max_priority
end

conditions.unshift(sql)

records = ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end

records.sort_by { rand() }
end

# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = MAX_RUN_TIME, &block)

# We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(5, max_run_time).each do |job|
begin
logger.info "* [JOB] aquiring lock on #{job.name}"
job.lock_exclusively!(max_run_time, worker_name)
runtime = Benchmark.realtime do
invoke_job(job.payload_object, &block)
job.destroy
end
logger.info "* [JOB] #{job.name} completed after %.4f" % runtime

return job
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
job.reschedule e.message, e.backtrace
log_exception(job, e)
return job
end
end

nil
end

# This method is used internally by reserve method to ensure exclusive access
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1

self.locked_at = now
self.locked_by = worker
end

def unlock
self.locked_at = nil
self.locked_by = nil
end

# This is a good hook if you need to report job processing errors in additional or different ways
def self.log_exception(job, error)
logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts"
logger.error(error)
end

def self.work_off(num = 100)
success, failure = 0, 0

num.times do
job = self.reserve do |j|
begin
j.perform
success += 1
rescue
failure += 1
raise
end
end

break if job.nil?
end

return [success, failure]
end

# Moved into its own method so that new_relic can trace it.
def self.invoke_job(job, &block)
block.call(job)
end

private

def deserialize(source)
handler = YAML.load(source) rescue nil

unless handler.respond_to?(:perform)
if handler.nil? && source =~ ParseObjectFromYaml
handler_class = $1
end
attempt_to_load(handler_class || handler.class)
handler = YAML.load(source)
end

return handler if handler.respond_to?(:perform)

raise DeserializationError,
'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
rescue TypeError, LoadError, NameError => e
raise DeserializationError,
"Job failed to load: #{e.message}. Try to manually require the required file."
end

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
def attempt_to_load(klass)
klass.constantize
end

def self.db_time_now
(ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
end

protected

def before_save
self.run_at ||= self.class.db_time_now
end

end

class EvaledJob
def initialize
@job = yield
end

def perform
eval(@job)
end
end
end
require File.dirname(__FILE__) + '/job/common'
require File.dirname(__FILE__) + (defined?(DataMapper) ? '/job/data_mapper' : '/job/active_record')
16 changes: 16 additions & 0 deletions lib/delayed/job/active_record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module Delayed
class Job < ActiveRecord::Base
set_table_name :delayed_jobs

include Common
protected
def before_save
self.run_at ||= self.class.db_time_now
end
end
end
#Place holder for type checking
module DataMapper
module Resource; end
module ObjectNotFoundError; end
end
Loading

1 comment on commit 3a9ffee

@dstrelau
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is awesome. have you sent a pull request to tobi yet?

Please sign in to comment.