Skip to content

Commit

Permalink
Support memory overflow protection.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sycamore-M committed Dec 30, 2024
1 parent 143584b commit 29c5dcc
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ApnsChannelPool {

private final Set<Future<Channel>> pendingCreateChannelFutures = new HashSet<>();
private final Queue<Promise<Channel>> pendingAcquisitionPromises = new ArrayDeque<>();
private final int maxPendingAcquisition;

private boolean isClosed = false;

Expand Down Expand Up @@ -91,12 +92,14 @@ public void handleConnectionCreationFailed() {
*
* @param channelFactory the factory to be used to create new channels
* @param capacity the maximum number of channels that may be held in this pool
* @param maxPendingAcquisition the maximum number of pending acquisition promises
* @param executor the executor on which listeners for acquisition/release promises will be called
* @param metricsListener an optional listener for metrics describing the performance and behavior of the pool
*/
ApnsChannelPool(final PooledObjectFactory<Channel> channelFactory, final int capacity, final OrderedEventExecutor executor, final ApnsChannelPoolMetricsListener metricsListener) {
ApnsChannelPool(final PooledObjectFactory<Channel> channelFactory, final int capacity, final int maxPendingAcquisition, final OrderedEventExecutor executor, final ApnsChannelPoolMetricsListener metricsListener) {
this.channelFactory = channelFactory;
this.capacity = capacity;
this.maxPendingAcquisition = maxPendingAcquisition;
this.executor = executor;

this.metricsListener = metricsListener != null ? metricsListener : new NoopChannelPoolMetricsListener();
Expand Down Expand Up @@ -180,7 +183,11 @@ private void acquireWithinEventExecutor(final Promise<Channel> acquirePromise) {
} else {
// We don't have any connections ready to go, and don't have any more capacity to create new
// channels. Add this acquisition to the queue waiting for channels to become available.
pendingAcquisitionPromises.add(acquirePromise);
if (pendingAcquisitionPromises.size() < maxPendingAcquisition) {
pendingAcquisitionPromises.add(acquirePromise);
} else {
acquirePromise.tryFailure(new RejectedAcquisitionException(maxPendingAcquisition));
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void handleConnectionCreationFailed() {

this.channelPool = new ApnsChannelPool(channelFactory,
clientConfiguration.getConcurrentConnections(),
clientConfiguration.getMaxPendingAcquisition(),
this.clientResources.getEventLoopGroup().next(),
channelPoolMetricsListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class ApnsClientBuilder {
private ApnsClientResources apnsClientResources;

private int concurrentConnections = 1;
private int maxPendingAcquisition = Integer.MAX_VALUE;

private ApnsClientMetricsListener metricsListener;

Expand Down Expand Up @@ -450,6 +451,22 @@ public ApnsClientBuilder setConcurrentConnections(final int concurrentConnection
return this;
}

/**
* <p>Sets the maximum number of the pending acquisition promises.
* Under certain conditions, the pending acquisition queue may continue to expand and cause memory overflow.
*
* <p>By default, clients will not attempt to check the size of the pending acquisition queue.
* Provide an appropriate value to avoid memory overflow.
*
* @param maxPendingAcquisition the maximum number of pending acquisition promises
*
* @return a reference to this builder
*/
public ApnsClientBuilder setMaxPendingAcquisition(final int maxPendingAcquisition) {
this.maxPendingAcquisition = maxPendingAcquisition;
return this;
}

/**
* Sets the metrics listener for the client under construction. Metrics listeners gather information that describes
* the performance and behavior of a client, and are completely optional.
Expand Down Expand Up @@ -654,6 +671,7 @@ public ApnsClient build() throws SSLException {
this.closeAfterIdleDuration,
this.gracefulShutdownTimeout,
this.concurrentConnections,
this.maxPendingAcquisition,
this.metricsListener,
this.frameLogger);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ApnsClientConfiguration {
private final Duration closeAfterIdleDuration;
private final Duration gracefulShutdownTimeout;
private final int concurrentConnections;
private final int maxPendingAcquisition;
private final ApnsClientMetricsListener metricsListener;
private final Http2FrameLogger frameLogger;

Expand All @@ -62,6 +63,7 @@ public ApnsClientConfiguration(final InetSocketAddress apnsServerAddress,
final Duration closeAfterIdleDuration,
final Duration gracefulShutdownTimeout,
final int concurrentConnections,
final int maxPendingAcquisition,
final ApnsClientMetricsListener metricsListener,
final Http2FrameLogger frameLogger) {

Expand All @@ -75,6 +77,7 @@ public ApnsClientConfiguration(final InetSocketAddress apnsServerAddress,
this.closeAfterIdleDuration = closeAfterIdleDuration;
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
this.concurrentConnections = concurrentConnections;
this.maxPendingAcquisition = maxPendingAcquisition;
this.metricsListener = metricsListener;
this.frameLogger = frameLogger;
}
Expand Down Expand Up @@ -119,6 +122,10 @@ public int getConcurrentConnections() {
return concurrentConnections;
}

public int getMaxPendingAcquisition() {
return maxPendingAcquisition;
}

public Optional<ApnsClientMetricsListener> getMetricsListener() {
return Optional.ofNullable(metricsListener);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.eatthepath.pushy.apns;

/**
* An exception thrown to indicate that a push notification should be rejected by the client
* to avoid a large number of pending acquisitions.
*/
public class RejectedAcquisitionException extends Exception {

public RejectedAcquisitionException(int maxPendingAcquisition) {
super("The number of pending acquisitions has reached the upper limit [" + maxPendingAcquisition + "]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static void setUpBeforeClass() {
@BeforeEach
public void setUp() {
this.metricsListener = new TestChannelPoolMetricListener();
this.pool = new ApnsChannelPool(new TestChannelFactory(), 1, EVENT_EXECUTOR, this.metricsListener);
this.pool = new ApnsChannelPool(new TestChannelFactory(), 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener);
}

@AfterAll
Expand Down Expand Up @@ -183,7 +183,7 @@ public Future<Void> destroy(final Channel channel, final Promise<Void> promise)
}
};

final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, EVENT_EXECUTOR, this.metricsListener);
final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener);

assertFalse(pool.acquire().await().isSuccess());

Expand Down Expand Up @@ -259,7 +259,7 @@ public Future<Void> destroy(final Channel channel, final Promise<Void> promise)
}
};

final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, EVENT_EXECUTOR, this.metricsListener);
final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener);

final Future<Channel> acquireNewChannelFuture = pool.acquire();
final Future<Channel> acquireReturnedChannelFuture = pool.acquire();
Expand Down

0 comments on commit 29c5dcc

Please sign in to comment.