From 75b49dc1c281ffa934a4eb2c7e840291e8b4a5ff Mon Sep 17 00:00:00 2001 From: "Tobias Luetke (home)" Date: Sun, 17 Feb 2008 16:01:35 -0500 Subject: [PATCH] Initial extraction --- MIT-LICENSE | 20 +++++ README | 103 +++++++++++++++++++++ init.rb | 5 ++ lib/delayed/job.rb | 145 ++++++++++++++++++++++++++++++ lib/delayed/message_sending.rb | 7 ++ lib/delayed/performable_method.rb | 37 ++++++++ spec/database.rb | 33 +++++++ spec/delayed_method_spec.rb | 81 +++++++++++++++++ spec/job_spec.rb | 122 +++++++++++++++++++++++++ spec/story_spec.rb | 18 ++++ tasks/jobs.rake | 29 ++++++ 11 files changed, 600 insertions(+) create mode 100644 MIT-LICENSE create mode 100644 README create mode 100644 init.rb create mode 100644 lib/delayed/job.rb create mode 100644 lib/delayed/message_sending.rb create mode 100644 lib/delayed/performable_method.rb create mode 100644 spec/database.rb create mode 100644 spec/delayed_method_spec.rb create mode 100644 spec/job_spec.rb create mode 100644 spec/story_spec.rb create mode 100644 tasks/jobs.rake diff --git a/MIT-LICENSE b/MIT-LICENSE new file mode 100644 index 000000000..926fda18c --- /dev/null +++ b/MIT-LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2005 Tobias Luetke + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOa AND +NONINFRINGEMENT. IN NO EVENT SaALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/README b/README new file mode 100644 index 000000000..99dcfbe73 --- /dev/null +++ b/README @@ -0,0 +1,103 @@ +Delayed::Job +============ + +Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. + +It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks. Amongst those tasks are: + +* sending massive newsletters +* image resizing +* http downloads +* updating smart collections +* updating solr, our search server, after product changes +* batch imports +* spam checks + +== Setup == + +The library evolves around a delayed_jobs table which looks as follows: + + create_table :delayed_jobs, :force => true do |table| + table.integer :priority, :default => 0 + table.integer :attempts, :default => 0 + table.text :handler + table.string :last_error + table.datetime :run_at + table.timestamps + end + +== Usage == + +Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. +Job objects are serialized to yaml so that they can later be resurrected by the job runner. + + class NewsletterJob < Struct.new(:text, :emails) + def perform + emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) } + end + end + + Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.find(:all).collect(&:email)) + +There is also a second way to get jobs in the queue: send_later. + + + BatchImporter.new(Shop.find(1)).send_later(:import_massive_csv, massive_csv) + + +This will simply create a Delayed::PerformableMethod job in the jobs table which serializes all the parameters you pass to it. There are some special smarts for active record objects +which are stored as their text representation and loaded from the database fresh when the job is actually run later. + + +== Running the tasks == + +You can invoke rake jobs:work which will start working off jobs. You can cancel the rake task by CTRL-C. + +At Shopify we run the the tasks from a simple script/job_runner which is being invoked by runnit: + + #!/usr/bin/env ruby + require File.dirname(__FILE__) + '/../config/environment' + + SLEEP = 15 + RESTART_AFTER = 1000 + + trap('TERM') { puts 'Exiting...'; $exit = true } + trap('INT') { puts 'Exiting...'; $exit = true } + + # this script dies after several runs to prevent memory leaks. + # runnit will immediately start it again. + count, runs_left = 0, RESTART_AFTER + + loop do + + count = 0 + + # this requires the locking plugin, also from jadedPixel + ActiveRecord::base.aquire_lock("jobs table worker", 10) do + puts 'got lock' + + realtime = Benchmark.realtime do + count = Delayed::Job.work_off + end + end + + runs_left -= 1 + + break if $exit + + if count.zero? + sleep(SLEEP) + else + status = "#{count} jobs completed at %.2f j/s ..." % [count / realtime] + RAILS_DEFAULT_LOGGER.info status + puts status + end + + if $exit or runs_left <= 0 + break + end + end + +== Todo == + +Work out a locking mechanism which would allow several job runners to run at the same time, spreading the load between them. diff --git a/init.rb b/init.rb new file mode 100644 index 000000000..765138bef --- /dev/null +++ b/init.rb @@ -0,0 +1,5 @@ +require File.dirname(__FILE__) + '/lib/delayed/message_sending' +require File.dirname(__FILE__) + '/lib/delayed/performable_method' +require File.dirname(__FILE__) + '/lib/delayed/job' + +Object.send(:include, Delayed::MessageSending) \ No newline at end of file diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb new file mode 100644 index 000000000..9303ec47a --- /dev/null +++ b/lib/delayed/job.rb @@ -0,0 +1,145 @@ +module Delayed + + class DeserializationError < StandardError + end + + class Job < ActiveRecord::Base + ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ + + set_table_name :delayed_jobs + + class Runner + attr_accessor :logger, :jobs + attr_accessor :runs, :success, :failure + + def initialize(jobs, logger = nil) + @jobs = jobs + @logger = logger + self.runs = self.success = self.failure = 0 + end + + def run + + ActiveRecord::Base.cache do + ActiveRecord::Base.transaction do + @jobs.each do |job| + self.runs += 1 + begin + time = Benchmark.measure do + job.perform + ActiveRecord::Base.uncached { job.destroy } + self.success += 1 + end + logger.debug "Executed job in #{time.real}" + rescue DeserializationError, StandardError, RuntimeError => e + if logger + logger.error "Job #{job.id}: #{e.class} #{e.message}" + logger.error e.backtrace.join("\n") + end + ActiveRecord::Base.uncached { job.reshedule e.message } + self.failure += 1 + end + end + end + end + + self + end + end + + def self.enqueue(object, priority = 0) + raise ArgumentError, 'Cannot enqueue items which do not respond to perform' unless object.respond_to?(:perform) + + Job.create(:handler => object, :priority => priority) + end + + def handler=(object) + self['handler'] = object.to_yaml + end + + def handler + @handler ||= deserialize(self['handler']) + end + + def perform + handler.perform + end + + def reshedule(message) + self.attempts += 1 + self.run_at = self.class.time_now + 5.minutes + self.last_error = message + save! + end + + def self.peek(limit = 1) + if limit == 1 + find(:first, :order => "priority DESC, run_at ASC", :conditions => ['run_at <= ?', time_now]) + else + find(:all, :order => "priority DESC, run_at ASC", :limit => limit, :conditions => ['run_at <= ?', time_now]) + end + end + + def self.work_off(limit = 100) + jobs = Job.find(:all, :conditions => ['run_at <= ?', time_now], :order => "priority DESC, run_at ASC", :limit => limit) + + Job::Runner.new(jobs, logger).run + end + + protected + + def self.time_now + (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now + end + + def before_save + self.run_at ||= self.class.time_now + end + + private + + def deserialize(source) + attempt_to_load_file = true + + begin + handler = YAML.load(source) rescue nil + return handler if handler.respond_to?(:perform) + + if handler.nil? + if source =~ ParseObjectFromYaml + + # Constantize the object so that ActiveSupport can attempt + # its auto loading magic. Will raise LoadError if not successful. + attempt_to_load($1) + + # If successful, retry the yaml.load + handler = YAML.load(source) + return handler if handler.respond_to?(:perform) + end + end + + if handler.is_a?(YAML::Object) + + # Constantize the object so that ActiveSupport can attempt + # its auto loading magic. Will raise LoadError if not successful. + attempt_to_load(handler.class) + + # If successful, retry the yaml.load + handler = YAML.load(source) + return handler if handler.respond_to?(:perform) + end + + raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.' + + rescue TypeError, LoadError, NameError => e + + raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file." + end + end + + def attempt_to_load(klass) + klass.constantize + end + + end +end \ No newline at end of file diff --git a/lib/delayed/message_sending.rb b/lib/delayed/message_sending.rb new file mode 100644 index 000000000..925abb47a --- /dev/null +++ b/lib/delayed/message_sending.rb @@ -0,0 +1,7 @@ +module Delayed + module MessageSending + def send_later(method, *args) + Delayed::Job.enqueue Delayed::PerformableMethod.new(self, method.to_sym, args) + end + end +end \ No newline at end of file diff --git a/lib/delayed/performable_method.rb b/lib/delayed/performable_method.rb new file mode 100644 index 000000000..bb8a4c4ee --- /dev/null +++ b/lib/delayed/performable_method.rb @@ -0,0 +1,37 @@ +module Delayed + class PerformableMethod < Struct.new(:object, :method, :args) + AR_STRING_FORMAT = /^AR\:([A-Z]\w+)\:(\d+)$/ + + def initialize(object, method, args) + raise NoMethodError, "undefined method `#{method}' for #{self.inspect}" unless object.respond_to?(method) + + self.object = dump(object) + self.args = args.map { |a| dump(a) } + self.method = method.to_sym + end + + def perform + load(object).send(method, *args.map{|a| load(a)}) + end + + private + + def load(arg) + case arg + when AR_STRING_FORMAT then $1.constantize.find($2) + else arg + end + end + + def dump(arg) + case arg + when ActiveRecord::Base then ar_to_string(arg) + else arg + end + end + + def ar_to_string(obj) + "AR:#{obj.class}:#{obj.id}" + end + end +end \ No newline at end of file diff --git a/spec/database.rb b/spec/database.rb new file mode 100644 index 000000000..f44c1b50c --- /dev/null +++ b/spec/database.rb @@ -0,0 +1,33 @@ +$:.unshift(File.dirname(__FILE__) + '/../lib') + +require 'rubygems' +require 'active_record' +require File.dirname(__FILE__) + '/../init' + +ActiveRecord::Base.logger = Logger.new(nil) +ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => '/tmp/jobs.sqlite') +ActiveRecord::Migration.verbose = false + +def reset_db + ActiveRecord::Schema.define do + + create_table :delayed_jobs, :force => true do |table| + table.integer :priority, :default => 0 + table.integer :attempts, :default => 0 + table.text :handler + table.string :last_error + table.datetime :run_at + table.timestamps + end + + create_table :stories, :force => true do |table| + table.string :text + end + + end +end + +# Purely useful for test cases... +class Story < ActiveRecord::Base + def tell; text; end +end \ No newline at end of file diff --git a/spec/delayed_method_spec.rb b/spec/delayed_method_spec.rb new file mode 100644 index 000000000..235769b14 --- /dev/null +++ b/spec/delayed_method_spec.rb @@ -0,0 +1,81 @@ +require File.dirname(__FILE__) + '/database' + + +class SimpleJob + cattr_accessor :runs; self.runs = 0 + def perform; @@runs += 1; end +end + +class RandomRubyObject + def say_hello + 'hello' + end +end + +class StoryReader + + def read(story) + "Epilog: #{story.tell}" + end + +end + + +describe 'random ruby objects' do + + before { reset_db } + + it "should respond_to :send_later method" do + + RandomRubyObject.new.respond_to?(:send_later) + + end + + it "should raise a ArgumentError if send_later is called but the target method doesn't exist" do + lambda { RandomRubyObject.new.send_later(:method_that_deos_not_exist) }.should raise_error(NoMethodError) + end + + it "should add a new entry to the job table when send_later is called on it" do + Delayed::Job.count.should == 0 + + RandomRubyObject.new.send_later(:to_s) + + Delayed::Job.count.should == 1 + end + + it "should run get the original method executed when the job is performed" do + + RandomRubyObject.new.send_later(:say_hello) + + Delayed::Job.count.should == 1 + Delayed::Job.peek.perform.should == 'hello' + end + + it "should store the object as string if its an active record" do + + story = Story.create :text => 'Once upon...' + story.send_later(:tell) + + job = Delayed::Job.peek + job.handler.class.should == Delayed::PerformableMethod + job.handler.object.should == 'AR:Story:1' + job.handler.method.should == :tell + job.handler.args.should == [] + job.perform.should == 'Once upon...' + end + + it "should store arguments as string if they an active record" do + + story = Story.create :text => 'Once upon...' + + reader = StoryReader.new + reader.send_later(:read, story) + + job = Delayed::Job.peek + job.handler.class.should == Delayed::PerformableMethod + job.handler.method.should == :read + job.handler.args.should == ['AR:Story:1'] + job.perform.should == 'Epilog: Once upon...' + end + +end \ No newline at end of file diff --git a/spec/job_spec.rb b/spec/job_spec.rb new file mode 100644 index 000000000..bd8f8354a --- /dev/null +++ b/spec/job_spec.rb @@ -0,0 +1,122 @@ +require File.dirname(__FILE__) + '/database' + + +class SimpleJob + cattr_accessor :runs; self.runs = 0 + def perform; @@runs += 1; end +end + +class ErrorJob + cattr_accessor :runs; self.runs = 0 + def perform; raise 'did not work'; end +end + +describe Delayed::Job do + + before :each do + reset_db + end + + it "should set run_at automatically" do + Delayed::Job.create.run_at.should_not == nil + end + + it "should raise ArgumentError when handler doesn't respond_to :perform" do + lambda { Delayed::Job.enqueue(Object.new) }.should raise_error(ArgumentError) + end + + it "should increase count after enqueuing items" do + Delayed::Job.enqueue SimpleJob.new + Delayed::Job.count.should == 1 + end + + it "should return nil when peeking on empty table" do + Delayed::Job.peek.should == nil + end + + it "should return a job when peeking a table with jobs in it" do + Delayed::Job.enqueue SimpleJob.new + Delayed::Job.peek.class.should == Delayed::Job + end + + it "should return an array of jobs when peek is called with a count larger than zero" do + Delayed::Job.enqueue SimpleJob.new + Delayed::Job.peek(2).class.should == Array + end + + it "should call perform on jobs when running work_off" do + SimpleJob.runs.should == 0 + + Delayed::Job.enqueue SimpleJob.new + Delayed::Job.work_off(1) + + SimpleJob.runs.should == 1 + end + + it "should re-schedule by about 5 minutes when it fails to execute properly" do + Delayed::Job.enqueue ErrorJob.new + runner = Delayed::Job.work_off(1) + runner.success.should == 0 + runner.failure.should == 1 + + job = Delayed::Job.find(:first) + job.last_error.should == 'did not work' + job.attempts.should == 1 + job.run_at.should > Time.now + 4.minutes + job.run_at.should < Time.now + 6.minutes + end + + it "should raise an DeserializationError when the job class is totally unknown" do + + job = Delayed::Job.new + job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}" + + lambda { job.perform }.should raise_error(Delayed::DeserializationError) + end + + it "should try to load the class when it is unknown at the time of the deserialization" do + job = Delayed::Job.new + job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}" + + job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true) + + lambda { job.perform }.should raise_error(Delayed::DeserializationError) + end + + it "should try include the namespace when loading unknown objects" do + job = Delayed::Job.new + job['handler'] = "--- !ruby/object:Delayed::JobThatDoesNotExist {}" + job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true) + lambda { job.perform }.should raise_error(Delayed::DeserializationError) + end + + + it "should also try to load structs when they are unknown (raises TypeError)" do + job = Delayed::Job.new + job['handler'] = "--- !ruby/struct:JobThatDoesNotExist {}" + + job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true) + + lambda { job.perform }.should raise_error(Delayed::DeserializationError) + end + + it "should try include the namespace when loading unknown structs" do + job = Delayed::Job.new + job['handler'] = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}" + job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true) + lambda { job.perform }.should raise_error(Delayed::DeserializationError) + end + +end + + + + + + + + + + + + diff --git a/spec/story_spec.rb b/spec/story_spec.rb new file mode 100644 index 000000000..f9d8ab76e --- /dev/null +++ b/spec/story_spec.rb @@ -0,0 +1,18 @@ +require File.dirname(__FILE__) + '/database' + +describe "A story" do + + before do + reset_db + Story.create :text => "Once upon a time..." + end + + it "should be shared" do + Story.find(:first).tell.should == 'Once upon a time...' + end + + it "should not return its result if it storytelling is delayed" do + Story.find(:first).send_later(:tell).should_not == 'Once upon a time...' + end + +end \ No newline at end of file diff --git a/tasks/jobs.rake b/tasks/jobs.rake new file mode 100644 index 000000000..91cb6d328 --- /dev/null +++ b/tasks/jobs.rake @@ -0,0 +1,29 @@ +namespace :jobs do + + task :work => :environment do + + SLEEP = 5 + + trap('TERM') { puts 'Exiting...'; $exit = true } + trap('INT') { puts 'Exiting...'; $exit = true } + + loop do + + count = 0 + + realtime = Benchmark.realtime do + count = Delayed::Job.work_off + end + + break if $exit + + if count.zero? + sleep(SLEEP) + else + RAILS_DEFAULT_LOGGER.info "#{count} jobs completed at %.2f j/s ..." % [count / realtime] + end + + break if $exit + end + end +end \ No newline at end of file