diff --git a/lib/sidekiq/throttled/cooldown.rb b/lib/sidekiq/throttled/cooldown.rb index 11f38bd..61afab0 100644 --- a/lib/sidekiq/throttled/cooldown.rb +++ b/lib/sidekiq/throttled/cooldown.rb @@ -23,9 +23,10 @@ def [](config) # @param config [Config] def initialize(config) - @queues = ExpirableSet.new(config.cooldown_period) - @threshold = config.cooldown_threshold + @queues = ExpirableSet.new @tracker = Concurrent::Map.new + @period = config.cooldown_period + @threshold = config.cooldown_threshold end # Notify that given queue returned job that was throttled. @@ -33,7 +34,7 @@ def initialize(config) # @param queue [String] # @return [void] def notify_throttled(queue) - @queues.add(queue) if @threshold <= @tracker.merge_pair(queue, 1, &:succ) + @queues.add(queue, ttl: @period) if @threshold <= @tracker.merge_pair(queue, 1, &:succ) end # Notify that given queue returned job that was not throttled. diff --git a/lib/sidekiq/throttled/expirable_set.rb b/lib/sidekiq/throttled/expirable_set.rb index 4242bf0..dc4d723 100644 --- a/lib/sidekiq/throttled/expirable_set.rb +++ b/lib/sidekiq/throttled/expirable_set.rb @@ -9,35 +9,36 @@ module Throttled # Set of elements with expirations. # # @example - # set = ExpirableSet.new(10.0) - # set.add("a") + # set = ExpirableSet.new + # set.add("a", ttl: 10.0) # sleep(5) - # set.add("b") + # set.add("b", ttl: 10.0) # set.to_a # => ["a", "b"] # sleep(5) # set.to_a # => ["b"] class ExpirableSet include Enumerable - # @param ttl [Float] expiration is seconds - # @raise [ArgumentError] if `ttl` is not positive Float - def initialize(ttl) - raise ArgumentError, "ttl must be positive Float" unless ttl.is_a?(Float) && ttl.positive? - + def initialize @elements = Concurrent::Map.new - @ttl = ttl end # @param element [Object] + # @param ttl [Float] expiration is seconds + # @raise [ArgumentError] if `ttl` is not positive Float # @return [ExpirableSet] self - def add(element) - # cleanup expired elements to avoid mem-leak + def add(element, ttl:) + raise ArgumentError, "ttl must be positive Float" unless ttl.is_a?(Float) && ttl.positive? + horizon = now + + # Cleanup expired elements expired = @elements.each_pair.select { |(_, sunset)| expired?(sunset, horizon) } expired.each { |pair| @elements.delete_pair(*pair) } - # add new element - @elements[element] = now + @ttl + # Add or update an element + sunset = horizon + ttl + @elements.merge_pair(element, sunset) { |old_sunset| [old_sunset, sunset].max } self end diff --git a/spec/lib/sidekiq/throttled/expirable_set_spec.rb b/spec/lib/sidekiq/throttled/expirable_set_spec.rb index 465c7ba..4549043 100644 --- a/spec/lib/sidekiq/throttled/expirable_set_spec.rb +++ b/spec/lib/sidekiq/throttled/expirable_set_spec.rb @@ -3,30 +3,44 @@ require "sidekiq/throttled/expirable_set" RSpec.describe Sidekiq::Throttled::ExpirableSet do - subject(:expirable_set) { described_class.new(2.0) } + subject(:expirable_set) { described_class.new } it { is_expected.to be_an Enumerable } - describe ".new" do + describe "#add" do it "raises ArgumentError if given TTL is not Float" do - expect { described_class.new(42) }.to raise_error(ArgumentError) + expect { expirable_set.add("a", ttl: 42) }.to raise_error(ArgumentError) end it "raises ArgumentError if given TTL is not positive" do - expect { described_class.new(0.0) }.to raise_error(ArgumentError) + expect { expirable_set.add("a", ttl: 0.0) }.to raise_error(ArgumentError) end - end - describe "#add" do it "returns self" do - expect(expirable_set.add("a")).to be expirable_set + expect(expirable_set.add("a", ttl: 1.0)).to be expirable_set end it "adds uniq elements to the set" do - expirable_set.add("a").add("b").add("b").add("a") + expirable_set.add("a", ttl: 1.0).add("b", ttl: 1.0).add("b", ttl: 1.0).add("a", ttl: 1.0) expect(expirable_set).to contain_exactly("a", "b") end + + it "uses longest sunset" do + monotonic_time = 0.0 + allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time } + + expirable_set.add("a", ttl: 1.0).add("b", ttl: 42.0).add("b", ttl: 1.0).add("a", ttl: 2.0) + + monotonic_time += 0.5 + expect(expirable_set).to contain_exactly("a", "b") + + monotonic_time += 1.0 + expect(expirable_set).to contain_exactly("a", "b") + + monotonic_time += 0.5 + expect(expirable_set).to contain_exactly("b") + end end describe "#each" do @@ -37,16 +51,16 @@ allow(Process).to receive(:clock_gettime).with(Process::CLOCK_MONOTONIC) { monotonic_time } - expirable_set.add("lorem") - expirable_set.add("ipsum") + expirable_set.add("lorem", ttl: 1.0) + expirable_set.add("ipsum", ttl: 1.0) - monotonic_time += 1 + monotonic_time += 0.5 - expirable_set.add("ipsum") + expirable_set.add("ipsum", ttl: 1.0) - monotonic_time += 1 + monotonic_time += 0.5 - expirable_set.add("dolor") + expirable_set.add("dolor", ttl: 1.0) end it { is_expected.to be_an(Enumerator) }