diff --git a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java index ad41c0d59..2c09f892b 100644 --- a/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java +++ b/vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java @@ -384,7 +384,7 @@ public void testPipeliningDistribution(TestContext ctx) { public void testPoolIdleTimeout(TestContext ctx) { ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost()); AtomicReference proxyConn = new AtomicReference<>(); - int pooleCleanerPeriod = 100; + int poolCleanerPeriod = 100; int idleTimeout = 3000; Async latch = ctx.async(); proxy.proxyHandler(conn -> { @@ -393,8 +393,8 @@ public void testPoolIdleTimeout(TestContext ctx) { conn.clientCloseHandler(v -> { long lifetime = System.currentTimeMillis() - now; int delta = 500; - int lowerBound = idleTimeout - pooleCleanerPeriod - delta; - int upperBound = idleTimeout + pooleCleanerPeriod + delta; + int lowerBound = idleTimeout - poolCleanerPeriod - delta; + int upperBound = idleTimeout + poolCleanerPeriod + delta; ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime); ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime); latch.complete(); @@ -408,7 +408,8 @@ public void testPoolIdleTimeout(TestContext ctx) { listenLatch.awaitSuccess(20_000); poolOptions - .setPoolCleanerPeriod(pooleCleanerPeriod) + .setPoolCleanerPeriod(poolCleanerPeriod) + .setMaxLifetime(0) .setIdleTimeout(idleTimeout) .setIdleTimeoutUnit(TimeUnit.MILLISECONDS); options.setPort(8080); @@ -422,6 +423,49 @@ public void testPoolIdleTimeout(TestContext ctx) { .onComplete(ctx.asyncAssertSuccess()); } + @Test + public void testPoolMaxLifetime(TestContext ctx) { + ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost()); + AtomicReference proxyConn = new AtomicReference<>(); + int poolCleanerPeriod = 100; + int maxLifetime = 3000; + Async latch = ctx.async(); + proxy.proxyHandler(conn -> { + proxyConn.set(conn); + long now = System.currentTimeMillis(); + conn.clientCloseHandler(v -> { + long lifetime = System.currentTimeMillis() - now; + int delta = 500; + int lowerBound = maxLifetime - poolCleanerPeriod - delta; + int upperBound = maxLifetime + poolCleanerPeriod + delta; + ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime); + ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime); + latch.complete(); + }); + conn.connect(); + }); + + // Start proxy + Async listenLatch = ctx.async(); + proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete())); + listenLatch.awaitSuccess(20_000); + + poolOptions + .setPoolCleanerPeriod(poolCleanerPeriod) + .setIdleTimeout(0) + .setMaxLifetime(maxLifetime) + .setMaxLifetimeUnit(TimeUnit.MILLISECONDS); + options.setPort(8080); + options.setHost("localhost"); + PgPool pool = createPool(options, poolOptions); + + // Create a connection that remains in the pool + pool + .getConnection() + .flatMap(SqlClient::close) + .onComplete(ctx.asyncAssertSuccess()); + } + @Test public void testPoolConnectTimeout(TestContext ctx) { Async async = ctx.async(2); @@ -463,9 +507,9 @@ public void testPoolConnectTimeout(TestContext ctx) { public void testNoConnectionLeaks(TestContext ctx) { Async killConnections = ctx.async(); PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> { - Collector> collector = mapping(row -> row.getInteger(0), toList()); + Collector> collector = mapping(row -> row.getBoolean(0), toList()); String sql = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = $1"; - PreparedQuery>> preparedQuery = conn.preparedQuery(sql).collecting(collector); + PreparedQuery>> preparedQuery = conn.preparedQuery(sql).collecting(collector); Tuple params = Tuple.of(options.getDatabase()); preparedQuery.execute(params).compose(cf -> conn.close()).onComplete(ctx.asyncAssertSuccess(v -> killConnections.complete())); })); diff --git a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java index 799daec9c..88cda6b73 100644 --- a/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java +++ b/vertx-sql-client/src/main/generated/io/vertx/sqlclient/PoolOptionsConverter.java @@ -45,6 +45,16 @@ public static void fromJson(Iterable> json, obj.setIdleTimeoutUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); } break; + case "maxLifetime": + if (member.getValue() instanceof Number) { + obj.setMaxLifetime(((Number)member.getValue()).intValue()); + } + break; + case "maxLifetimeUnit": + if (member.getValue() instanceof String) { + obj.setMaxLifetimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue())); + } + break; case "maxSize": if (member.getValue() instanceof Number) { obj.setMaxSize(((Number)member.getValue()).intValue()); @@ -88,6 +98,10 @@ public static void toJson(PoolOptions obj, java.util.Map json) { if (obj.getIdleTimeoutUnit() != null) { json.put("idleTimeoutUnit", obj.getIdleTimeoutUnit().name()); } + json.put("maxLifetime", obj.getMaxLifetime()); + if (obj.getMaxLifetimeUnit() != null) { + json.put("maxLifetimeUnit", obj.getMaxLifetimeUnit().name()); + } json.put("maxSize", obj.getMaxSize()); json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize()); if (obj.getName() != null) { diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java index 9af1d615e..0239dae8d 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java @@ -18,7 +18,6 @@ package io.vertx.sqlclient; import io.vertx.codegen.annotations.DataObject; -import io.vertx.core.http.HttpClientOptions; import io.vertx.core.impl.Arguments; import io.vertx.core.json.JsonObject; @@ -48,11 +47,21 @@ public class PoolOptions { */ public static final int DEFAULT_IDLE_TIMEOUT = 0; + /** + * Default maximum pooled connection lifetime = 0 (no maximum) + */ + public static final int DEFAULT_MAXIMUM_LIFETIME = 0; + /** * Default connection idle time unit in the pool = seconds */ public static final TimeUnit DEFAULT_IDLE_TIMEOUT_TIME_UNIT = TimeUnit.SECONDS; + /** + * Default maximum pooled connection lifetime unit = seconds + */ + public static final TimeUnit DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT = TimeUnit.SECONDS; + /** * Default pool cleaner period = 1000 ms (1 second) */ @@ -87,6 +96,8 @@ public class PoolOptions { private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE; private int idleTimeout = DEFAULT_IDLE_TIMEOUT; private TimeUnit idleTimeoutUnit = DEFAULT_IDLE_TIMEOUT_TIME_UNIT; + private int maxLifetime = DEFAULT_MAXIMUM_LIFETIME; + private TimeUnit maxLifetimeUnit = DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT; private int poolCleanerPeriod = DEFAULT_POOL_CLEANER_PERIOD; private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT; private TimeUnit connectionTimeoutUnit = DEFAULT_CONNECTION_TIMEOUT_TIME_UNIT; @@ -177,16 +188,54 @@ public int getIdleTimeout() { } /** - * Establish an idle timeout for pooled connections. + * Establish an idle timeout for pooled connections, a value of zero disables the idle timeout. * - * @param idleTimeout the pool connection idle time unitq + * @param idleTimeout the pool connection idle timeout * @return a reference to this, so the API can be used fluently */ public PoolOptions setIdleTimeout(int idleTimeout) { + Arguments.require(idleTimeout >= 0, "idleTimeout must be >= 0"); this.idleTimeout = idleTimeout; return this; } + /** + * @return the pooled connection max lifetime unit + */ + public TimeUnit getMaxLifetimeUnit() { + return maxLifetimeUnit; + } + + /** + * Establish a max lifetime unit for pooled connections. + * + * @param maxLifetimeUnit pooled connection max lifetime unit + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setMaxLifetimeUnit(TimeUnit maxLifetimeUnit) { + this.maxLifetimeUnit = maxLifetimeUnit; + return this; + } + + /** + * @return pooled connection max lifetime + */ + public int getMaxLifetime() { + return maxLifetime; + } + + /** + * Establish a max lifetime for pooled connections, a value of zero disables the maximum lifetime. + * + * @param maxLifetime the pool connection max lifetime + * @return a reference to this, so the API can be used fluently + */ + public PoolOptions setMaxLifetime(int maxLifetime) { + Arguments.require(maxLifetime >= 0, "maxLifetime must be >= 0"); + this.maxLifetime = maxLifetime; + return this; + } + /** * @return the connection pool cleaner period in ms. */ diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java index d905c764a..7b28da062 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java @@ -43,6 +43,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable { private final CloseFuture closeFuture; private final long idleTimeout; private final long connectionTimeout; + private final long maxLifetime; private final long cleanerPeriod; private final boolean pipelined; private volatile Handler connectionInitializer; @@ -62,20 +63,21 @@ public PoolImpl(VertxInternal vertx, this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit()); this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit()); + this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit()); this.cleanerPeriod = poolOptions.getPoolCleanerPeriod(); this.timerID = -1L; this.pipelined = pipelined; this.vertx = vertx; - this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); + this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize()); this.closeFuture = closeFuture; } public Pool init() { closeFuture.add(this); - if (idleTimeout > 0 && cleanerPeriod > 0) { + if ((idleTimeout > 0 || maxLifetime > 0) && cleanerPeriod > 0) { synchronized (this) { timerID = vertx.setTimer(cleanerPeriod, id -> { - checkExpired(); + runEviction(); }); } } @@ -90,17 +92,17 @@ public Pool connectionProvider(Function> connecti return this; } - private void checkExpired() { + private void runEviction() { synchronized (this) { if (timerID == -1) { // Cancelled return; } timerID = vertx.setTimer(cleanerPeriod, id -> { - checkExpired(); + runEviction(); }); } - pool.checkExpired(); + pool.evict(); } @Override diff --git a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java index c125ab9fd..f8f3d8690 100644 --- a/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java +++ b/vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java @@ -53,6 +53,7 @@ public class SqlConnectionPool { private final Function> beforeRecycle; private final boolean pipelined; private final long idleTimeout; + private final long maxLifetime; private final int maxSize; public SqlConnectionPool(Function> connectionProvider, @@ -61,6 +62,7 @@ public SqlConnectionPool(Function> connectionProv Function> beforeRecycle, VertxInternal vertx, long idleTimeout, + long maxLifetime, int maxSize, boolean pipelined, int maxWaitQueueSize, @@ -75,6 +77,7 @@ public SqlConnectionPool(Function> connectionProv this.vertx = vertx; this.pipelined = pipelined; this.idleTimeout = idleTimeout; + this.maxLifetime = maxLifetime; this.maxSize = maxSize; this.hook = hook; this.connectionProvider = connectionProvider; @@ -142,9 +145,9 @@ public int size() { return pool.size(); } - public void checkExpired() { + public void evict() { long now = System.currentTimeMillis(); - pool.evict(conn -> conn.expirationTimestamp < now, ar -> { + pool.evict(conn -> conn.shouldEvict(now), ar -> { if (ar.succeeded()) { List res = ar.result(); for (PooledConnection conn : res) { @@ -169,7 +172,7 @@ public Future execute(ContextInternal context, CommandBase cmd) { future = pooled.schedule(context, cmd); } return future.andThen(ar -> { - pooled.expirationTimestamp = System.currentTimeMillis() + idleTimeout; + pooled.refresh(); lease.recycle(); }); }); @@ -264,12 +267,15 @@ public class PooledConnection implements Connection, Connection.Holder { private Holder holder; private Promise> poolCallback; private Lease lease; - public long expirationTimestamp; + public long idleEvictionTimestamp; + public long lifetimeEvictionTimestamp; PooledConnection(ConnectionFactory factory, Connection conn, PoolConnector.Listener listener) { this.factory = factory; this.conn = conn; this.listener = listener; + this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE; + refresh(); } @Override @@ -346,6 +352,10 @@ private void close(Promise promise) { conn.close(this, promise); } + private void refresh() { + this.idleEvictionTimestamp = idleTimeout > 0 ? System.currentTimeMillis() + idleTimeout : Long.MAX_VALUE; + } + @Override public void init(Holder holder) { if (this.holder != null) { @@ -389,7 +399,7 @@ private void doClose(Holder holder, Promise promise) { private void cleanup(Promise promise) { Lease l = this.lease; this.lease = null; - this.expirationTimestamp = System.currentTimeMillis() + idleTimeout; + refresh(); l.recycle(); promise.complete(); } @@ -435,5 +445,18 @@ public int getSecretKey() { public Connection unwrap() { return conn; } + + private boolean hasIdleExpired(long now) { + return idleEvictionTimestamp < now; + } + + private boolean hasLifetimeExpired(long now) { + return lifetimeEvictionTimestamp < now; + } + + private boolean shouldEvict(long now) { + return hasIdleExpired(now) || hasLifetimeExpired(now); + } + } }