Skip to content

Commit

Permalink
Add min_size to connection pool and hash config
Browse files Browse the repository at this point in the history
Fix rails#50989

In `ConnectionPool`, rename :size to :max_size, and add :min_size as a
configuration option. :min_size is initialized from
`db_config.min_size`, where I've also added a new configuration option.

`ConnectionPool#initialize` calls a new method,
`ensure_minimum_connections` when `min_size` is > 0.

In `ConnectionPool#flush`, we need to pay attention to the minimum pool
size, and make sure that we don't close too many connections.

Finally, `ConnectionPool::Reaper` also calls into
`ensure_minimum_connections` after reaping/flushing to ensure that we
always have the desired number of established connections active.
  • Loading branch information
catlee committed Feb 6, 2024
1 parent d91f0f5 commit 8f979dc
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 7 deletions.
6 changes: 6 additions & 0 deletions activerecord/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
* Add an option to `ActiveRecord::ConnectionAdapters::ConnectionPool` and
`ActiveRecord::DatabaseConfigurations::HashConfig` to specify a minimum number
of connections to keep alive in the database connection pool.

*Chris AtLee*

* Support `:source_location` tag option for query log tags

```ruby
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ConnectionPool
include ConnectionAdapters::AbstractPool

attr_accessor :automatic_reconnect, :checkout_timeout
attr_reader :db_config, :max_size, :reaper, :pool_config, :async_executor, :role, :shard
attr_reader :db_config, :max_size, :min_size, :reaper, :pool_config, :async_executor, :role, :shard
alias :size :max_size

delegate :schema_reflection, :schema_reflection=, :server_version, to: :pool_config
Expand All @@ -139,6 +139,7 @@ def initialize(pool_config)
@checkout_timeout = db_config.checkout_timeout
@idle_timeout = db_config.idle_timeout
@max_size = db_config.pool
@min_size = db_config.min_size

# This variable tracks the cache of threads mapped to reserved connections, with the
# sole purpose of speeding up the +connection+ method. It is not the authoritative
Expand Down Expand Up @@ -170,6 +171,10 @@ def initialize(pool_config)

@reaper = Reaper.new(self, db_config.reaping_frequency)
@reaper.run

if @min_size > 0
ensure_minimum_connections
end
end

def lock_thread=(lock_thread)
Expand Down Expand Up @@ -448,9 +453,24 @@ def flush(minimum_idle = @idle_timeout)

idle_connections = synchronize do
return if self.discarded?
@connections.select do |conn|

to_close = @connections.select do |conn|
!conn.in_use? && conn.seconds_idle >= minimum_idle
end.each do |conn|
end.sort_by { |conn| -conn.seconds_idle } # sort by longest idle first

# Ensure that we have at least @min_size connections available.
# If the pool is at capacity, then we will remove the longest-idle
# connections until @min_size is reached.

# we're going to remove to_close.size connections, leaving us with connections.size - to_close.size
# connections. If that number is less than @min_size, then we need reduce to_close by
# to_close.size - (@min_size - connections.size)
if @connections.size - to_close.size < @min_size
n = (@min_size - (@connections.size - to_close.size))
to_close = to_close[n..] || []
end

to_close.each do |conn|
conn.lease

@available.delete conn
Expand All @@ -470,6 +490,26 @@ def flush!
flush(-1)
end

# Ensure that we have a minimum number of connections established.
def ensure_minimum_connections
return if @connections.size >= @min_size

new_count = @min_size - @connections.size
new_conns = []

begin
new_count.times do
conn = checkout
new_conns << conn
conn.connect!
end
ensure
new_conns.each do |conn|
checkin(conn)
end
end
end

def num_waiting_in_queue # :nodoc:
@available.num_waiting
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def spawn_thread(frequency)
@pools[frequency].each do |p|
p.reap
p.flush
p.ensure_minimum_connections
rescue WeakRef::RefError
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ def pool
(configuration_hash[:pool] || 5).to_i
end

def min_size
(configuration_hash[:min_size] || 0).to_i
end

def min_threads
(configuration_hash[:min_threads] || 0).to_i
end
Expand Down
8 changes: 4 additions & 4 deletions activerecord/test/cases/asynchronous_queries_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ def test_one_global_thread_pool_is_used_when_set_with_default_concurrency
assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)

assert_equal 0, async_pool1.min_length
assert_equal 4, async_pool1.min_length
assert_equal 4, async_pool1.max_length
assert_equal 16, async_pool1.max_queue
assert_equal :caller_runs, async_pool1.fallback_policy

assert_equal 0, async_pool2.min_length
assert_equal 4, async_pool2.min_length
assert_equal 4, async_pool2.max_length
assert_equal 16, async_pool2.max_queue
assert_equal :caller_runs, async_pool2.fallback_policy
Expand Down Expand Up @@ -215,12 +215,12 @@ def test_concurrency_can_be_set_on_global_thread_pool
assert async_pool1.is_a?(Concurrent::ThreadPoolExecutor)
assert async_pool2.is_a?(Concurrent::ThreadPoolExecutor)

assert_equal 0, async_pool1.min_length
assert_equal 8, async_pool1.min_length
assert_equal 8, async_pool1.max_length
assert_equal 32, async_pool1.max_queue
assert_equal :caller_runs, async_pool1.fallback_policy

assert_equal 0, async_pool2.min_length
assert_equal 8, async_pool2.min_length
assert_equal 8, async_pool2.max_length
assert_equal 32, async_pool2.max_queue
assert_equal :caller_runs, async_pool2.fallback_policy
Expand Down
45 changes: 45 additions & 0 deletions activerecord/test/cases/connection_pool_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,51 @@ def test_idle_timeout_configuration
assert_equal 0, @pool.connections.length
end

def test_min_size_configuration
@pool.disconnect!

config = @db_config.configuration_hash.merge(min_size: 1)
db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new(@db_config.env_name, @db_config.name, config)

pool_config = ActiveRecord::ConnectionAdapters::PoolConfig.new(ActiveRecord::Base, db_config, :writing, :default)
@pool = ConnectionPool.new(pool_config)

@pool.flush
assert_equal 1, @pool.connections.length
end

def test_idle_timeout_configuration_with_min_size
@pool.disconnect!

config = @db_config.configuration_hash.merge(idle_timeout: "0.02", min_size: 1)
db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new(@db_config.env_name, @db_config.name, config)

pool_config = ActiveRecord::ConnectionAdapters::PoolConfig.new(ActiveRecord::Base, db_config, :writing, :default)
@pool = ConnectionPool.new(pool_config)
connections = 2.times.map { @pool.checkout }
connections.each { |conn| @pool.checkin(conn) }

connections.each do |conn|
conn.instance_variable_set(
:@idle_since,
Process.clock_gettime(Process::CLOCK_MONOTONIC) - 0.01
)
end

@pool.flush
assert_equal 2, @pool.connections.length

connections.each do |conn|
conn.instance_variable_set(
:@idle_since,
Process.clock_gettime(Process::CLOCK_MONOTONIC) - 0.02
)
end

@pool.flush
assert_equal 1, @pool.connections.length
end

def test_disable_flush
@pool.disconnect!

Expand Down
3 changes: 3 additions & 0 deletions activerecord/test/cases/reaper_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ def discard!
def discarded?
@discarded
end

def ensure_minimum_connections
end
end

# A reaper with nil time should never reap connections
Expand Down

0 comments on commit 8f979dc

Please sign in to comment.