diff --git a/.github/workflows/pr-build-and-test.yml b/.github/workflows/pr-build-and-test.yml index 5824356a..55ec50a5 100644 --- a/.github/workflows/pr-build-and-test.yml +++ b/.github/workflows/pr-build-and-test.yml @@ -43,7 +43,7 @@ jobs: find . -type d -name "*surefire*" -exec cp --parents -R {} test-results/ \; zip -r test-results.zip test-results - name: Upload test results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: failure() with: name: test-results diff --git a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java index 83aa796d..cf956cd7 100644 --- a/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/AsyncOxiaClientImpl.java @@ -51,6 +51,7 @@ import io.streamnative.oxia.proto.ListResponse; import io.streamnative.oxia.proto.RangeScanRequest; import io.streamnative.oxia.proto.RangeScanResponse; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -62,6 +63,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import lombok.NonNull; @@ -105,8 +107,8 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { notificationManager, readBatchManager, writeBatchManager, - sessionManager); - + sessionManager, + config.requestTimeout()); return shardManager.start().thenApply(v -> client); } @@ -118,6 +120,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { private final @NonNull BatchManager readBatchManager; private final @NonNull BatchManager writeBatchManager; private final @NonNull SessionManager sessionManager; + private final long requestTimeoutMs; private volatile boolean closed; private final Counter counterPutBytes; @@ -152,7 +155,8 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { @NonNull NotificationManager notificationManager, @NonNull BatchManager readBatchManager, @NonNull BatchManager writeBatchManager, - @NonNull SessionManager sessionManager) { + @NonNull SessionManager sessionManager, + Duration requestTimeout) { this.clientIdentifier = clientIdentifier; this.instrumentProvider = instrumentProvider; this.stubManager = stubManager; @@ -162,6 +166,7 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { this.writeBatchManager = writeBatchManager; this.sessionManager = sessionManager; this.scheduledExecutor = scheduledExecutor; + this.requestTimeoutMs = requestTimeout.toMillis(); counterPutBytes = instrumentProvider.newCounter( @@ -288,18 +293,20 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient { } catch (RuntimeException e) { callback = CompletableFuture.failedFuture(e); } - return callback.whenComplete( - (putResult, throwable) -> { - gaugePendingPutRequests.decrement(); - gaugePendingPutBytes.add(-value.length); - - if (throwable == null) { - counterPutBytes.add(value.length); - histogramPutLatency.recordSuccess(System.nanoTime() - startTime); - } else { - histogramPutLatency.recordFailure(System.nanoTime() - startTime); - } - }); + return callback + .orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS) + .whenComplete( + (putResult, throwable) -> { + gaugePendingPutRequests.decrement(); + gaugePendingPutBytes.add(-value.length); + + if (throwable == null) { + counterPutBytes.add(value.length); + histogramPutLatency.recordSuccess(System.nanoTime() - startTime); + } else { + histogramPutLatency.recordFailure(System.nanoTime() - startTime); + } + }); } private CompletableFuture internalPut( @@ -355,7 +362,7 @@ private CompletableFuture internalPut( }); } - return future; + return future.orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS); } @Override @@ -381,15 +388,17 @@ private CompletableFuture internalPut( } catch (RuntimeException e) { callback.completeExceptionally(e); } - return callback.whenComplete( - (putResult, throwable) -> { - gaugePendingDeleteRequests.decrement(); - if (throwable == null) { - histogramDeleteLatency.recordSuccess(System.nanoTime() - startTime); - } else { - histogramDeleteLatency.recordFailure(System.nanoTime() - startTime); - } - }); + return callback + .orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS) + .whenComplete( + (putResult, throwable) -> { + gaugePendingDeleteRequests.decrement(); + if (throwable == null) { + histogramDeleteLatency.recordSuccess(System.nanoTime() - startTime); + } else { + histogramDeleteLatency.recordFailure(System.nanoTime() - startTime); + } + }); } @Override @@ -436,15 +445,17 @@ private CompletableFuture internalPut( } catch (RuntimeException e) { callback = CompletableFuture.failedFuture(e); } - return callback.whenComplete( - (putResult, throwable) -> { - gaugePendingDeleteRangeRequests.decrement(); - if (throwable == null) { - histogramDeleteRangeLatency.recordSuccess(System.nanoTime() - startTime); - } else { - histogramDeleteRangeLatency.recordFailure(System.nanoTime() - startTime); - } - }); + return callback + .orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS) + .whenComplete( + (putResult, throwable) -> { + gaugePendingDeleteRangeRequests.decrement(); + if (throwable == null) { + histogramDeleteRangeLatency.recordSuccess(System.nanoTime() - startTime); + } else { + histogramDeleteRangeLatency.recordFailure(System.nanoTime() - startTime); + } + }); } @Override @@ -464,18 +475,20 @@ private CompletableFuture internalPut( } catch (RuntimeException e) { callback.completeExceptionally(e); } - return callback.whenComplete( - (getResult, throwable) -> { - gaugePendingGetRequests.decrement(); - if (throwable == null) { - if (getResult != null) { - counterGetBytes.add(getResult.getValue().length); - } - histogramGetLatency.recordSuccess(System.nanoTime() - startTime); - } else { - histogramGetLatency.recordFailure(System.nanoTime() - startTime); - } - }); + return callback + .orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS) + .whenComplete( + (getResult, throwable) -> { + gaugePendingGetRequests.decrement(); + if (throwable == null) { + if (getResult != null) { + counterGetBytes.add(getResult.getValue().length); + } + histogramGetLatency.recordSuccess(System.nanoTime() - startTime); + } else { + histogramGetLatency.recordFailure(System.nanoTime() - startTime); + } + }); } private void internalGet( @@ -566,16 +579,18 @@ private void internalGetFloorCeiling( } catch (Exception e) { callback = CompletableFuture.failedFuture(e); } - return callback.whenComplete( - (listResult, throwable) -> { - gaugePendingListRequests.decrement(); - if (throwable == null) { - counterListBytes.add(listResult.stream().mapToInt(String::length).sum()); - histogramListLatency.recordSuccess(System.nanoTime() - startTime); - } else { - histogramListLatency.recordFailure(System.nanoTime() - startTime); - } - }); + return callback + .orTimeout(requestTimeoutMs, TimeUnit.MILLISECONDS) + .whenComplete( + (listResult, throwable) -> { + gaugePendingListRequests.decrement(); + if (throwable == null) { + counterListBytes.add(listResult.stream().mapToInt(String::length).sum()); + histogramListLatency.recordSuccess(System.nanoTime() - startTime); + } else { + histogramListLatency.recordFailure(System.nanoTime() - startTime); + } + }); } @Override diff --git a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java index b45b40c1..0f4bd322 100644 --- a/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java +++ b/client/src/test/java/io/streamnative/oxia/client/AsyncOxiaClientImplTest.java @@ -1,5 +1,5 @@ /* - * Copyright © 2022-2024 StreamNative Inc. + * Copyright © 2022-2025 StreamNative Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Optional.empty; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -47,9 +48,10 @@ import io.streamnative.oxia.proto.ListRequest; import io.streamnative.oxia.proto.ListResponse; import io.streamnative.oxia.proto.OxiaClientGrpc; +import java.time.Duration; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -70,6 +72,8 @@ class AsyncOxiaClientImplTest { AsyncOxiaClientImpl client; + private final Duration requestTimeout = Duration.ofSeconds(1); + @BeforeEach void setUp() { client = @@ -82,7 +86,8 @@ void setUp() { notificationManager, readBatchManager, writeBatchManager, - sessionManager); + sessionManager, + requestTimeout); } @AfterEach @@ -114,6 +119,25 @@ void put() { }); } + @Test + void putWithTimeout() { + var opCaptor = ArgumentCaptor.forClass(PutOperation.class); + var shardId = 1L; + var key = "key"; + var value = "hello".getBytes(UTF_8); + when(shardManager.getShardForKey(key)).thenReturn(shardId); + when(writeBatchManager.getBatcher(shardId)).thenReturn(batcher); + doNothing().when(batcher).add(opCaptor.capture()); + var result = client.put(key, value); + try { + result.join(); + fail("unexpected"); + } catch (Throwable ex) { + assertThat(ex).isInstanceOf(CompletionException.class); + assertThat(ex.getCause()).isInstanceOf(TimeoutException.class); + } + } + @Test void putFails() { var opCaptor = ArgumentCaptor.forClass(PutOperation.class); @@ -209,6 +233,22 @@ void deleteFails() { assertThat(result).isNotCompleted(); } + @Test + void deleteWithTimeout() { + var shardId = 1L; + var key = "key"; + when(shardManager.getShardForKey(key)).thenReturn(shardId); + when(writeBatchManager.getBatcher(shardId)).thenReturn(batcher); + var result = client.delete(key); + try { + result.join(); + fail("unexpected"); + } catch (Throwable ex) { + assertThat(ex).isInstanceOf(CompletionException.class); + assertThat(ex.getCause()).isInstanceOf(TimeoutException.class); + } + } + @Test void deleteClosed() throws Exception { client.close(); @@ -299,6 +339,58 @@ void deleteRange() { assertThat(result).isCompleted(); } + @Test + void deleteRangeWithTimeout() { + var batcher1 = mock(Batcher.class); + var batcher2 = mock(Batcher.class); + var batcher3 = mock(Batcher.class); + var opCaptor1 = ArgumentCaptor.forClass(DeleteRangeOperation.class); + var opCaptor2 = ArgumentCaptor.forClass(DeleteRangeOperation.class); + var opCaptor3 = ArgumentCaptor.forClass(DeleteRangeOperation.class); + var startInclusive = "a-startInclusive"; + var endExclusive = "z-endExclusive"; + when(shardManager.allShardIds()).thenReturn(Set.of(1L, 2L, 3L)); + when(writeBatchManager.getBatcher(1L)).thenReturn(batcher1); + when(writeBatchManager.getBatcher(2L)).thenReturn(batcher2); + when(writeBatchManager.getBatcher(3L)).thenReturn(batcher3); + doNothing().when(batcher1).add(opCaptor1.capture()); + doNothing().when(batcher2).add(opCaptor2.capture()); + doNothing().when(batcher3).add(opCaptor3.capture()); + var result = client.deleteRange(startInclusive, endExclusive); + assertThat(result).isNotCompleted(); + + assertThat(opCaptor1.getValue()) + .satisfies( + o -> { + assertThat(o.startKeyInclusive()).isEqualTo(startInclusive); + assertThat(o.endKeyExclusive()).isEqualTo(endExclusive); + assertThat(o.callback()).isNotCompleted(); + }); + + assertThat(opCaptor2.getValue()) + .satisfies( + o -> { + assertThat(o.startKeyInclusive()).isEqualTo(startInclusive); + assertThat(o.endKeyExclusive()).isEqualTo(endExclusive); + assertThat(o.callback()).isNotCompleted(); + }); + + assertThat(opCaptor3.getValue()) + .satisfies( + o -> { + assertThat(o.startKeyInclusive()).isEqualTo(startInclusive); + assertThat(o.endKeyExclusive()).isEqualTo(endExclusive); + assertThat(o.callback()).isNotCompleted(); + }); + try { + result.join(); + fail("unexpected"); + } catch (Throwable ex) { + assertThat(ex).isInstanceOf(CompletionException.class); + assertThat(ex.getCause()).isInstanceOf(TimeoutException.class); + } + } + @Test void deleteRangeClosed() throws Exception { client.close(); @@ -352,6 +444,22 @@ void getFails() { assertThat(result).isCompletedExceptionally(); } + @Test + void getWithTimeout() { + var shardId = 1L; + var key = "key"; + when(shardManager.getShardForKey(key)).thenReturn(shardId); + when(readBatchManager.getBatcher(shardId)).thenReturn(batcher); + var result = client.get(key); + try { + result.join(); + fail("unexpected"); + } catch (Throwable ex) { + assertThat(ex).isInstanceOf(CompletionException.class); + assertThat(ex.getCause()).isInstanceOf(TimeoutException.class); + } + } + @Test void getClosed() throws Exception { client.close(); @@ -377,6 +485,21 @@ void list(@Mock OxiaStub stub0, @Mock OxiaStub stub1) { .containsExactlyInAnyOrder("0-a", "0-b", "0-c", "0-d", "1-a", "1-b", "1-c", "1-d"); } + @Test + void listWithTimeout(@Mock OxiaStub stub0, @Mock OxiaStub stub1) { + when(shardManager.allShardIds()).thenReturn(Set.of(0L, 1L)); + setupTimeoutStub(0L, "leader0", stub0); + setupTimeoutStub(1L, "leader1", stub1); + final var result = client.list("a", "e"); + try { + result.join(); + fail("unexpected"); + } catch (Throwable ex) { + assertThat(ex).isInstanceOf(CompletionException.class); + assertThat(ex.getCause()).isInstanceOf(TimeoutException.class); + } + } + @Test void listClosed() throws Exception { @@ -396,6 +519,15 @@ void listNullEnd() throws Exception { assertThat(client.list("a", null)).isCompletedExceptionally(); } + private void setupTimeoutStub(long shardId, String leader, OxiaStub stub) { + when(shardManager.leader(shardId)).thenReturn(leader); + when(stubManager.getStub(leader)).thenReturn(stub); + + var async = mock(OxiaClientGrpc.OxiaClientStub.class); + when(stub.async()).thenReturn(async); + doNothing().when(async).list(any(ListRequest.class), any(StreamObserver.class)); + } + private void setupListStub(long shardId, String leader, OxiaStub stub) { when(shardManager.leader(shardId)).thenReturn(leader); when(stubManager.getStub(leader)).thenReturn(stub);