Skip to content

Commit

Permalink
Return a CompletableFuture from BulkIngester.flush method
Browse files Browse the repository at this point in the history
  • Loading branch information
aidin36 committed May 10, 2024
1 parent 64ec619 commit 7216ae1
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
Expand Down Expand Up @@ -80,9 +80,9 @@ private static class RequestExecution<Context> {
public final long id;
public final BulkRequest request;
public final List<Context> contexts;
public final CompletionStage<BulkResponse> futureResponse;
public final CompletableFuture<BulkResponse> futureResponse;

RequestExecution(long id, BulkRequest request, List<Context> contexts, CompletionStage<BulkResponse> futureResponse) {
RequestExecution(long id, BulkRequest request, List<Context> contexts, CompletableFuture<BulkResponse> futureResponse) {
this.id = id;
this.request = request;
this.contexts = contexts;
Expand All @@ -99,7 +99,7 @@ private BulkIngester(Builder<Context> builder) {
this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations;
this.listener = builder.listener;
this.flushIntervalMillis = builder.flushIntervalMillis;

if (flushIntervalMillis != null) {
long flushInterval = flushIntervalMillis;

Expand All @@ -119,7 +119,7 @@ private BulkIngester(Builder<Context> builder) {
// It's not ours, we will not close it.
scheduler = builder.scheduler;
}

this.flushTask = scheduler.scheduleWithFixedDelay(
this::failsafeFlush,
flushInterval, flushInterval,
Expand Down Expand Up @@ -271,7 +271,11 @@ private void failsafeFlush() {
}
}

public void flush() {
/**
* @return A future of the response, or null if there was no operations to execute.
*/
@Nullable
public CompletableFuture<BulkResponse> flush() {
RequestExecution<Context> exec = sendRequestCondition.whenReadyIf(
() -> {
// May happen on manual and periodic flushes
Expand All @@ -294,7 +298,7 @@ public void flush() {
listener.beforeBulk(id, request, requestContexts);
}

CompletionStage<BulkResponse> result = client.bulk(request);
CompletableFuture<BulkResponse> result = client.bulk(request);
requestsInFlightCount++;

if (listener == null) {
Expand Down Expand Up @@ -327,7 +331,11 @@ public void flush() {
}
return null;
});

return exec.futureResponse;
}

return null;
}

public void add(BulkOperation operation, Context context) {
Expand Down Expand Up @@ -483,7 +491,7 @@ public Builder<Context> flushInterval(long value, TimeUnit unit) {
/**
* Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
* <p>
* Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
* Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
*/
public Builder<Context> flushInterval(long value, TimeUnit unit, ScheduledExecutorService scheduler) {
this.scheduler = scheduler;
Expand Down Expand Up @@ -528,4 +536,3 @@ public BulkIngester<Context> build() {
}
}
}

0 comments on commit 7216ae1

Please sign in to comment.