Skip to content

Commit

Permalink
Made a gem out of delayed_job so I can use it in my services. Had to …
Browse files Browse the repository at this point in the history
…modify Worker to not infer DJ is running in a Rails instance.
  • Loading branch information
Justin Knowlden committed Nov 29, 2008
1 parent bcf8d1d commit 9ba6b08
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.gem
2 changes: 2 additions & 0 deletions HISTORY.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
== 0.1.0 / 2008-11-28
* First of many versions
39 changes: 39 additions & 0 deletions delayed_job.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Gem::Specification.new do |s|
s.name = "delayed_job"
s.version = "0.1.0"
s.date = "2008-11-28"
s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify"
s.email = "[email protected]"
s.homepage = "http://github.com/tobi/delayed_job/tree/master"
s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks."
s.authors = ["Tobias Lütke", "Justin Knowlden"]

# s.bindir = "bin"
# s.executables = ["delayed_job"]
# s.default_executable = "delayed_job"

s.has_rdoc = false
s.rdoc_options = ["--main", "README.textile"]
s.extra_rdoc_files = ["HISTORY.txt", "README.textile"]

# run git ls-files to get an updated list
s.files = %w[
HISTORY.txt
MIT-LICENSE
README.textile
delayed_job.gemspec
init.rb
lib/delayed/job.rb
lib/delayed/message_sending.rb
lib/delayed/performable_method.rb
lib/delayed/worker.rb
lib/delayed_job.rb
tasks/jobs.rake
]
s.test_files = %w[
spec/database.rb
spec/delayed_method_spec.rb
spec/job_spec.rb
spec/story_spec.rb
]
end
6 changes: 1 addition & 5 deletions init.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
require File.dirname(__FILE__) + '/lib/delayed/message_sending'
require File.dirname(__FILE__) + '/lib/delayed/performable_method'
require File.dirname(__FILE__) + '/lib/delayed/job'

Object.send(:include, Delayed::MessageSending)
require File.dirname(__FILE__) + '/lib/delayed_job'
46 changes: 23 additions & 23 deletions lib/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class Job < ActiveRecord::Base
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true

# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Every worker has a unique name which by default is the pid of the process.
# There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
Expand All @@ -25,10 +25,10 @@ class Job < ActiveRecord::Base
NextTaskOrder = 'priority DESC, run_at ASC'

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

cattr_accessor :min_priority, :max_priority
self.min_priority = nil
self.max_priority = nil
self.max_priority = nil

class LockError < StandardError
end
Expand All @@ -45,8 +45,8 @@ def failed?
def payload_object
@payload_object ||= deserialize(self['handler'])
end
def name

def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
Expand Down Expand Up @@ -80,13 +80,13 @@ def self.enqueue(*args, &block)
if block_given?
priority = args.first || 0
run_at = args.second

Job.create(:payload_object => EvaledJob.new(&block), :priority => priority.to_i, :run_at => run_at)
else
object = args.first
priority = args.second || 0
run_at = args.third

unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
Expand All @@ -96,32 +96,32 @@ def self.enqueue(*args, &block)
end

def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
time_now = db_time_now

time_now = db_time_now

sql = NextTaskSQL.dup

conditions = [time_now, time_now - max_run_time, worker_name]

if self.min_priority
sql << ' AND (priority >= ?)'
conditions << min_priority
end

if self.max_priority
sql << ' AND (priority <= ?)'
conditions << max_priority
conditions << max_priority
end

conditions.unshift(sql)
conditions.unshift(sql)

records = ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end

records.sort { rand() }
end
end

# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = MAX_RUN_TIME, &block)
Expand All @@ -142,7 +142,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME, &block)
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
rescue StandardError => e
rescue StandardError => e
job.reschedule e.message, e.backtrace
log_exception(job, e)
return job
Expand All @@ -160,16 +160,16 @@ def lock_exclusively!(max_run_time, worker = worker_name)
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
# We already own this job, this may happen if the job queue crashes.
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1

self.locked_at = now
self.locked_by = worker
end

def unlock
self.locked_at = nil
self.locked_by = nil
Expand Down
18 changes: 10 additions & 8 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@ module Delayed
class Worker
SLEEP = 5

cattr_accessor :logger
self.logger = RAILS_DEFAULT_LOGGER if const_defined?(:RAILS_DEFAULT_LOGGER)

def initialize(options={})
@quiet = options[:quiet]
@quiet = options[:quiet]
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
end

def start
say "*** Starting job worker #{Delayed::Job.worker_name}"

trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }



loop do
result = nil

Expand All @@ -33,15 +35,15 @@ def start
end

break if $exit
end
end

ensure
Delayed::Job.clear_locks!
end

def say(text)
puts text unless @quiet
RAILS_DEFAULT_LOGGER.info text
logger.info text if logger
end

end
Expand Down
6 changes: 6 additions & 0 deletions lib/delayed_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
require File.dirname(__FILE__) + '/delayed/message_sending'
require File.dirname(__FILE__) + '/delayed/performable_method'
require File.dirname(__FILE__) + '/delayed/job'
require File.dirname(__FILE__) + '/delayed/worker'

Object.send(:include, Delayed::MessageSending)

0 comments on commit 9ba6b08

Please sign in to comment.