Skip to content

Commit

Permalink
Merge branch 'max-lifetime' of github.com:kdubb/vertx-sql-client into…
Browse files Browse the repository at this point in the history
… kdubb-max-lifetime
  • Loading branch information
tsegismont committed Apr 17, 2023
2 parents 76afd3d + d839f24 commit 06ab6db
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 20 deletions.
56 changes: 50 additions & 6 deletions vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void testPipeliningDistribution(TestContext ctx) {
public void testPoolIdleTimeout(TestContext ctx) {
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
AtomicReference<ProxyServer.Connection> proxyConn = new AtomicReference<>();
int pooleCleanerPeriod = 100;
int poolCleanerPeriod = 100;
int idleTimeout = 3000;
Async latch = ctx.async();
proxy.proxyHandler(conn -> {
Expand All @@ -393,8 +393,8 @@ public void testPoolIdleTimeout(TestContext ctx) {
conn.clientCloseHandler(v -> {
long lifetime = System.currentTimeMillis() - now;
int delta = 500;
int lowerBound = idleTimeout - pooleCleanerPeriod - delta;
int upperBound = idleTimeout + pooleCleanerPeriod + delta;
int lowerBound = idleTimeout - poolCleanerPeriod - delta;
int upperBound = idleTimeout + poolCleanerPeriod + delta;
ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime);
latch.complete();
Expand All @@ -408,7 +408,8 @@ public void testPoolIdleTimeout(TestContext ctx) {
listenLatch.awaitSuccess(20_000);

poolOptions
.setPoolCleanerPeriod(pooleCleanerPeriod)
.setPoolCleanerPeriod(poolCleanerPeriod)
.setMaxLifetime(0)
.setIdleTimeout(idleTimeout)
.setIdleTimeoutUnit(TimeUnit.MILLISECONDS);
options.setPort(8080);
Expand All @@ -422,6 +423,49 @@ public void testPoolIdleTimeout(TestContext ctx) {
.onComplete(ctx.asyncAssertSuccess());
}

@Test
public void testPoolMaxLifetime(TestContext ctx) {
ProxyServer proxy = ProxyServer.create(vertx, options.getPort(), options.getHost());
AtomicReference<ProxyServer.Connection> proxyConn = new AtomicReference<>();
int poolCleanerPeriod = 100;
int maxLifetime = 3000;
Async latch = ctx.async();
proxy.proxyHandler(conn -> {
proxyConn.set(conn);
long now = System.currentTimeMillis();
conn.clientCloseHandler(v -> {
long lifetime = System.currentTimeMillis() - now;
int delta = 500;
int lowerBound = maxLifetime - poolCleanerPeriod - delta;
int upperBound = maxLifetime + poolCleanerPeriod + delta;
ctx.assertTrue(lifetime >= lowerBound, "Was expecting connection to be closed in more than " + lowerBound + ": " + lifetime);
ctx.assertTrue(lifetime <= upperBound, "Was expecting connection to be closed in less than " + upperBound + ": "+ lifetime);
latch.complete();
});
conn.connect();
});

// Start proxy
Async listenLatch = ctx.async();
proxy.listen(8080, "localhost", ctx.asyncAssertSuccess(res -> listenLatch.complete()));
listenLatch.awaitSuccess(20_000);

poolOptions
.setPoolCleanerPeriod(poolCleanerPeriod)
.setIdleTimeout(0)
.setMaxLifetime(maxLifetime)
.setMaxLifetimeUnit(TimeUnit.MILLISECONDS);
options.setPort(8080);
options.setHost("localhost");
PgPool pool = createPool(options, poolOptions);

// Create a connection that remains in the pool
pool
.getConnection()
.flatMap(SqlClient::close)
.onComplete(ctx.asyncAssertSuccess());
}

@Test
public void testPoolConnectTimeout(TestContext ctx) {
Async async = ctx.async(2);
Expand Down Expand Up @@ -463,9 +507,9 @@ public void testPoolConnectTimeout(TestContext ctx) {
public void testNoConnectionLeaks(TestContext ctx) {
Async killConnections = ctx.async();
PgConnection.connect(vertx, options).onComplete(ctx.asyncAssertSuccess(conn -> {
Collector<Row, ?, List<Integer>> collector = mapping(row -> row.getInteger(0), toList());
Collector<Row, ?, List<Boolean>> collector = mapping(row -> row.getBoolean(0), toList());
String sql = "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND datname = $1";
PreparedQuery<SqlResult<List<Integer>>> preparedQuery = conn.preparedQuery(sql).collecting(collector);
PreparedQuery<SqlResult<List<Boolean>>> preparedQuery = conn.preparedQuery(sql).collecting(collector);
Tuple params = Tuple.of(options.getDatabase());
preparedQuery.execute(params).compose(cf -> conn.close()).onComplete(ctx.asyncAssertSuccess(v -> killConnections.complete()));
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setIdleTimeoutUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "maxLifetime":
if (member.getValue() instanceof Number) {
obj.setMaxLifetime(((Number)member.getValue()).intValue());
}
break;
case "maxLifetimeUnit":
if (member.getValue() instanceof String) {
obj.setMaxLifetimeUnit(java.util.concurrent.TimeUnit.valueOf((String)member.getValue()));
}
break;
case "maxSize":
if (member.getValue() instanceof Number) {
obj.setMaxSize(((Number)member.getValue()).intValue());
Expand Down Expand Up @@ -88,6 +98,10 @@ public static void toJson(PoolOptions obj, java.util.Map<String, Object> json) {
if (obj.getIdleTimeoutUnit() != null) {
json.put("idleTimeoutUnit", obj.getIdleTimeoutUnit().name());
}
json.put("maxLifetime", obj.getMaxLifetime());
if (obj.getMaxLifetimeUnit() != null) {
json.put("maxLifetimeUnit", obj.getMaxLifetimeUnit().name());
}
json.put("maxSize", obj.getMaxSize());
json.put("maxWaitQueueSize", obj.getMaxWaitQueueSize());
if (obj.getName() != null) {
Expand Down
55 changes: 52 additions & 3 deletions vertx-sql-client/src/main/java/io/vertx/sqlclient/PoolOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.vertx.sqlclient;

import io.vertx.codegen.annotations.DataObject;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.impl.Arguments;
import io.vertx.core.json.JsonObject;

Expand Down Expand Up @@ -48,11 +47,21 @@ public class PoolOptions {
*/
public static final int DEFAULT_IDLE_TIMEOUT = 0;

/**
* Default maximum pooled connection lifetime = 0 (no maximum)
*/
public static final int DEFAULT_MAXIMUM_LIFETIME = 0;

/**
* Default connection idle time unit in the pool = seconds
*/
public static final TimeUnit DEFAULT_IDLE_TIMEOUT_TIME_UNIT = TimeUnit.SECONDS;

/**
* Default maximum pooled connection lifetime unit = seconds
*/
public static final TimeUnit DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT = TimeUnit.SECONDS;

/**
* Default pool cleaner period = 1000 ms (1 second)
*/
Expand Down Expand Up @@ -87,6 +96,8 @@ public class PoolOptions {
private int maxWaitQueueSize = DEFAULT_MAX_WAIT_QUEUE_SIZE;
private int idleTimeout = DEFAULT_IDLE_TIMEOUT;
private TimeUnit idleTimeoutUnit = DEFAULT_IDLE_TIMEOUT_TIME_UNIT;
private int maxLifetime = DEFAULT_MAXIMUM_LIFETIME;
private TimeUnit maxLifetimeUnit = DEFAULT_MAXIMUM_LIFETIME_TIME_UNIT;
private int poolCleanerPeriod = DEFAULT_POOL_CLEANER_PERIOD;
private int connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;
private TimeUnit connectionTimeoutUnit = DEFAULT_CONNECTION_TIMEOUT_TIME_UNIT;
Expand Down Expand Up @@ -177,16 +188,54 @@ public int getIdleTimeout() {
}

/**
* Establish an idle timeout for pooled connections.
* Establish an idle timeout for pooled connections, a value of zero disables the idle timeout.
*
* @param idleTimeout the pool connection idle time unitq
* @param idleTimeout the pool connection idle timeout
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setIdleTimeout(int idleTimeout) {
Arguments.require(idleTimeout >= 0, "idleTimeout must be >= 0");
this.idleTimeout = idleTimeout;
return this;
}

/**
* @return the pooled connection max lifetime unit
*/
public TimeUnit getMaxLifetimeUnit() {
return maxLifetimeUnit;
}

/**
* Establish a max lifetime unit for pooled connections.
*
* @param maxLifetimeUnit pooled connection max lifetime unit
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setMaxLifetimeUnit(TimeUnit maxLifetimeUnit) {
this.maxLifetimeUnit = maxLifetimeUnit;
return this;
}

/**
* @return pooled connection max lifetime
*/
public int getMaxLifetime() {
return maxLifetime;
}

/**
* Establish a max lifetime for pooled connections, a value of zero disables the maximum lifetime.
*
* @param maxLifetime the pool connection max lifetime
* @return a reference to this, so the API can be used fluently
*/
public PoolOptions setMaxLifetime(int maxLifetime) {
Arguments.require(maxLifetime >= 0, "maxLifetime must be >= 0");
this.maxLifetime = maxLifetime;
return this;
}

/**
* @return the connection pool cleaner period in ms.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class PoolImpl extends SqlClientBase implements Pool, Closeable {
private final CloseFuture closeFuture;
private final long idleTimeout;
private final long connectionTimeout;
private final long maxLifetime;
private final long cleanerPeriod;
private final boolean pipelined;
private volatile Handler<SqlConnectionPool.PooledConnection> connectionInitializer;
Expand All @@ -62,20 +63,21 @@ public PoolImpl(VertxInternal vertx,

this.idleTimeout = MILLISECONDS.convert(poolOptions.getIdleTimeout(), poolOptions.getIdleTimeoutUnit());
this.connectionTimeout = MILLISECONDS.convert(poolOptions.getConnectionTimeout(), poolOptions.getConnectionTimeoutUnit());
this.maxLifetime = MILLISECONDS.convert(poolOptions.getMaxLifetime(), poolOptions.getMaxLifetimeUnit());
this.cleanerPeriod = poolOptions.getPoolCleanerPeriod();
this.timerID = -1L;
this.pipelined = pipelined;
this.vertx = vertx;
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.pool = new SqlConnectionPool(ctx -> connectionProvider.apply(ctx), () -> connectionInitializer, afterAcquire, beforeRecycle, vertx, idleTimeout, maxLifetime, poolOptions.getMaxSize(), pipelined, poolOptions.getMaxWaitQueueSize(), poolOptions.getEventLoopSize());
this.closeFuture = closeFuture;
}

public Pool init() {
closeFuture.add(this);
if (idleTimeout > 0 && cleanerPeriod > 0) {
if ((idleTimeout > 0 || maxLifetime > 0) && cleanerPeriod > 0) {
synchronized (this) {
timerID = vertx.setTimer(cleanerPeriod, id -> {
checkExpired();
runEviction();
});
}
}
Expand All @@ -90,17 +92,17 @@ public Pool connectionProvider(Function<Context, Future<SqlConnection>> connecti
return this;
}

private void checkExpired() {
private void runEviction() {
synchronized (this) {
if (timerID == -1) {
// Cancelled
return;
}
timerID = vertx.setTimer(cleanerPeriod, id -> {
checkExpired();
runEviction();
});
}
pool.checkExpired();
pool.evict();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SqlConnectionPool {
private final Function<Connection, Future<Void>> beforeRecycle;
private final boolean pipelined;
private final long idleTimeout;
private final long maxLifetime;
private final int maxSize;

public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProvider,
Expand All @@ -61,6 +62,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
Function<Connection, Future<Void>> beforeRecycle,
VertxInternal vertx,
long idleTimeout,
long maxLifetime,
int maxSize,
boolean pipelined,
int maxWaitQueueSize,
Expand All @@ -75,6 +77,7 @@ public SqlConnectionPool(Function<Context, Future<SqlConnection>> connectionProv
this.vertx = vertx;
this.pipelined = pipelined;
this.idleTimeout = idleTimeout;
this.maxLifetime = maxLifetime;
this.maxSize = maxSize;
this.hook = hook;
this.connectionProvider = connectionProvider;
Expand Down Expand Up @@ -142,9 +145,9 @@ public int size() {
return pool.size();
}

public void checkExpired() {
public void evict() {
long now = System.currentTimeMillis();
pool.evict(conn -> conn.expirationTimestamp < now, ar -> {
pool.evict(conn -> conn.shouldEvict(now), ar -> {
if (ar.succeeded()) {
List<PooledConnection> res = ar.result();
for (PooledConnection conn : res) {
Expand All @@ -169,7 +172,7 @@ public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
future = pooled.schedule(context, cmd);
}
return future.andThen(ar -> {
pooled.expirationTimestamp = System.currentTimeMillis() + idleTimeout;
pooled.refresh();
lease.recycle();
});
});
Expand Down Expand Up @@ -264,12 +267,15 @@ public class PooledConnection implements Connection, Connection.Holder {
private Holder holder;
private Promise<ConnectResult<PooledConnection>> poolCallback;
private Lease<PooledConnection> lease;
public long expirationTimestamp;
public long idleEvictionTimestamp;
public long lifetimeEvictionTimestamp;

PooledConnection(ConnectionFactory factory, Connection conn, PoolConnector.Listener listener) {
this.factory = factory;
this.conn = conn;
this.listener = listener;
this.lifetimeEvictionTimestamp = maxLifetime > 0 ? System.currentTimeMillis() + maxLifetime : Long.MAX_VALUE;
refresh();
}

@Override
Expand Down Expand Up @@ -346,6 +352,10 @@ private void close(Promise<Void> promise) {
conn.close(this, promise);
}

private void refresh() {
this.idleEvictionTimestamp = idleTimeout > 0 ? System.currentTimeMillis() + idleTimeout : Long.MAX_VALUE;
}

@Override
public void init(Holder holder) {
if (this.holder != null) {
Expand Down Expand Up @@ -389,7 +399,7 @@ private void doClose(Holder holder, Promise<Void> promise) {
private void cleanup(Promise<Void> promise) {
Lease<PooledConnection> l = this.lease;
this.lease = null;
this.expirationTimestamp = System.currentTimeMillis() + idleTimeout;
refresh();
l.recycle();
promise.complete();
}
Expand Down Expand Up @@ -435,5 +445,18 @@ public int getSecretKey() {
public Connection unwrap() {
return conn;
}

private boolean hasIdleExpired(long now) {
return idleEvictionTimestamp < now;
}

private boolean hasLifetimeExpired(long now) {
return lifetimeEvictionTimestamp < now;
}

private boolean shouldEvict(long now) {
return hasIdleExpired(now) || hasLifetimeExpired(now);
}

}
}

0 comments on commit 06ab6db

Please sign in to comment.