Skip to content

Commit

Permalink
Implement minimum connection pool and thread pool sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
catlee committed Dec 4, 2023
1 parent 04f29cb commit b99cfaf
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 8 deletions.
29 changes: 23 additions & 6 deletions activerecord/lib/active_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,15 @@ def self.legacy_connection_handling=(_)

def self.global_thread_pool_async_query_executor # :nodoc:
concurrency = global_executor_concurrency || 4
@global_thread_pool_async_query_executor ||= Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: concurrency,
max_queue: concurrency * 4,
fallback_policy: :caller_runs
)
min_threads = global_executor_min_threads || 0
@global_thread_pool_async_query_executor ||= begin
Concurrent::ThreadPoolExecutor.new(
min_threads: min_threads,
max_threads: concurrency,
max_queue: concurrency * 4,
fallback_policy: :caller_runs
)
end
end

# Set the +global_executor_concurrency+. This configuration value can only be used
Expand All @@ -289,6 +292,20 @@ def self.global_executor_concurrency # :nodoc:
@global_executor_concurrency ||= nil
end

# Set the +global_executor_min_threads+. This configuration value can only be used
# with the global thread pool async query executor.
def self.global_executor_min_threads=(global_executor_min_threads)
if self.async_query_executor.nil? || self.async_query_executor == :multi_thread_pool
raise ArgumentError, "`global_executor_min_threads` cannot be set when using the executor is nil or set to multi_thead_pool. For multiple thread pools, please set the concurrency in your database configuration."
end

@global_executor_min_threads = global_executor_min_threads
end

def self.global_executor_min_threads # :nodoc:
@global_executor_min_threads ||= nil
end

singleton_class.attr_accessor :index_nested_attribute_errors
self.index_nested_attribute_errors = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def initialize(pool_config)
@checkout_timeout = db_config.checkout_timeout
@idle_timeout = db_config.idle_timeout
@size = db_config.pool
@min_size = db_config.min_size

# This variable tracks the cache of threads mapped to reserved connections, with the
# sole purpose of speeding up the +connection+ method. It is not the authoritative
Expand Down Expand Up @@ -171,6 +172,10 @@ def initialize(pool_config)

@reaper = Reaper.new(self, db_config.reaping_frequency)
@reaper.run

if @min_size > 0
ensure_minimum_connections
end
end

def lock_thread=(lock_thread)
Expand Down Expand Up @@ -449,9 +454,24 @@ def flush(minimum_idle = @idle_timeout)

idle_connections = synchronize do
return if self.discarded?
@connections.select do |conn|

to_close = @connections.select do |conn|
!conn.in_use? && conn.seconds_idle >= minimum_idle
end.each do |conn|
end.sort_by { |conn| -conn.seconds_idle } # sort by longest idle first

# Ensure that we have at least @min_size connections available.
# If the pool is at capacity, then we will remove the longest-idle
# connections until @min_size is reached.

# we're going to remove to_close.size connections, leaving us with connections.size - to_close.size
# connections. If that number is less than @min_size, then we need reduce to_close by
# to_close.size - (@min_size - connections.size)
if @connections.size - to_close.size < @min_size
n = (@min_size - (@connections.size - to_close.size))
to_close = to_close[n..] || []
end

to_close.each do |conn|
conn.lease

@available.delete conn
Expand All @@ -471,6 +491,26 @@ def flush!
flush(-1)
end

# Ensure that we have a minimum number of connections established.
def ensure_minimum_connections
return if @connections.size >= @min_size

new_count = @min_size - @connections.size
new_conns = []

begin
new_count.times do
conn = checkout
new_conns << conn
conn.connect!
end
ensure
new_conns.each do |conn|
checkin(conn)
end
end
end

def num_waiting_in_queue # :nodoc:
@available.num_waiting
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def spawn_thread(frequency)
@pools[frequency].each do |p|
p.reap
p.flush
p.ensure_minimum_connections
rescue WeakRef::RefError
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ def pool
(configuration_hash[:pool] || 5).to_i
end

def min_size
(configuration_hash[:min_size] || 0).to_i
end

def min_threads
(configuration_hash[:min_threads] || 0).to_i
end
Expand Down
33 changes: 33 additions & 0 deletions activerecord/test/cases/asynchronous_queries_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,39 @@ def test_concurrency_can_be_set_on_global_thread_pool
ActiveRecord.instance_variable_set(:@global_thread_pool_async_query_executor, old_global_thread_pool_async_query_executor)
end

def test_min_threads_can_be_set_on_global_thread_pool
old_value = ActiveRecord.async_query_executor
ActiveRecord.async_query_executor = :global_thread_pool
old_threads = ActiveRecord.global_executor_min_threads
old_global_thread_pool_async_query_executor = ActiveRecord.instance_variable_get(:@global_thread_pool_async_query_executor)
ActiveRecord.instance_variable_set(:@global_thread_pool_async_query_executor, nil)
ActiveRecord.global_executor_min_threads = 1

handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new
db_config = ActiveRecord::Base.configurations.configs_for(env_name: "arunit", name: "primary")
db_config2 = ActiveRecord::Base.configurations.configs_for(env_name: "arunit2", name: "primary")
pool1 = handler.establish_connection(db_config)
pool2 = handler.establish_connection(db_config2, owner_name: ARUnit2Model)

async_pool1 = pool1.instance_variable_get(:@async_executor)
async_pool2 = pool2.instance_variable_get(:@async_executor)

assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)

assert_equal 1, async_pool1.min_length

assert_equal 1, async_pool2.min_length

assert_equal 2, handler.connection_pool_list(:all).count
assert_equal async_pool1, async_pool2
ensure
clean_up_connection_handler
ActiveRecord.async_query_executor = old_value
ActiveRecord.global_executor_min_threads = old_threads
ActiveRecord.instance_variable_set(:@global_thread_pool_async_query_executor, old_global_thread_pool_async_query_executor)
end

def test_concurrency_cannot_be_set_with_null_executor_or_multi_thread_pool
old_value = ActiveRecord.async_query_executor
ActiveRecord.async_query_executor = nil
Expand Down
45 changes: 45 additions & 0 deletions activerecord/test/cases/connection_pool_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,51 @@ def test_idle_timeout_configuration
assert_equal 0, @pool.connections.length
end

def test_min_size_configuration
@pool.disconnect!

config = @db_config.configuration_hash.merge(min_size: 1)
db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new(@db_config.env_name, @db_config.name, config)

pool_config = ActiveRecord::ConnectionAdapters::PoolConfig.new(ActiveRecord::Base, db_config, :writing, :default)
@pool = ConnectionPool.new(pool_config)

@pool.flush
assert_equal 1, @pool.connections.length
end

def test_idle_timeout_configuration_with_min_size
@pool.disconnect!

config = @db_config.configuration_hash.merge(idle_timeout: "0.02", min_size: 1)
db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new(@db_config.env_name, @db_config.name, config)

pool_config = ActiveRecord::ConnectionAdapters::PoolConfig.new(ActiveRecord::Base, db_config, :writing, :default)
@pool = ConnectionPool.new(pool_config)
connections = 2.times.map { @pool.checkout }
connections.each { |conn| @pool.checkin(conn) }

connections.each do |conn|
conn.instance_variable_set(
:@idle_since,
Process.clock_gettime(Process::CLOCK_MONOTONIC) - 0.01
)
end

@pool.flush
assert_equal 2, @pool.connections.length

connections.each do |conn|
conn.instance_variable_set(
:@idle_since,
Process.clock_gettime(Process::CLOCK_MONOTONIC) - 0.02
)
end

@pool.flush
assert_equal 1, @pool.connections.length
end

def test_disable_flush
@pool.disconnect!

Expand Down

0 comments on commit b99cfaf

Please sign in to comment.