From c02292f82b38872efd7220e25595af15e4ccb38d Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Mon, 20 Jan 2025 12:28:34 +1100 Subject: [PATCH] Retry internally when CAS upload is throttled [GCS] (#120250) Fixes #116546 --- docs/changelog/120250.yaml | 6 ++ .../AzureBlobStoreRepositoryMetricsTests.java | 48 +++++++++++---- .../azure/AzureBlobContainerStatsTests.java | 7 ++- ...eCloudStorageBlobStoreRepositoryTests.java | 4 +- .../gcs/GoogleCloudStorageBlobStore.java | 59 +++++++++++++------ .../gcs/GoogleCloudStorageRepository.java | 38 +++++++++++- ...CloudStorageBlobContainerRetriesTests.java | 45 +++++++++++++- ...leCloudStorageBlobStoreContainerTests.java | 4 +- .../http/ResponseInjectingHttpHandler.java | 55 ++++------------- 9 files changed, 186 insertions(+), 80 deletions(-) create mode 100644 docs/changelog/120250.yaml rename modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/ResponseInjectingAzureHttpHandler.java => test/framework/src/main/java/org/elasticsearch/http/ResponseInjectingHttpHandler.java (57%) diff --git a/docs/changelog/120250.yaml b/docs/changelog/120250.yaml new file mode 100644 index 0000000000000..5df5bfa7d04ed --- /dev/null +++ b/docs/changelog/120250.yaml @@ -0,0 +1,6 @@ +pr: 120250 +summary: "Retry internally when CAS upload is throttled [GCS]" +area: Snapshot/Restore +type: enhancement +issues: + - 116546 diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java index 7848422b869df..a9082e5373e90 100644 --- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java +++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryMetricsTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.repositories.azure; +import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -21,6 +22,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.http.ResponseInjectingHttpHandler; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.RepositoriesService; @@ -46,7 +48,6 @@ import java.util.stream.IntStream; import static org.elasticsearch.repositories.azure.AbstractAzureServerTestCase.randomBlobContent; -import static org.elasticsearch.repositories.azure.ResponseInjectingAzureHttpHandler.createFailNRequestsHandler; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -60,7 +61,7 @@ public class AzureBlobStoreRepositoryMetricsTests extends AzureBlobStoreReposito ); private static final int MAX_RETRIES = 3; - private final Queue requestHandlers = new ConcurrentLinkedQueue<>(); + private final Queue requestHandlers = new ConcurrentLinkedQueue<>(); @Override protected Map createHttpHandlers() { @@ -68,7 +69,7 @@ protected Map createHttpHandlers() { assert httpHandlers.size() == 1 : "This assumes there's a single handler"; return httpHandlers.entrySet() .stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> new ResponseInjectingAzureHttpHandler(requestHandlers, e.getValue()))); + .collect(Collectors.toMap(Map.Entry::getKey, e -> new ResponseInjectingHttpHandler(requestHandlers, e.getValue()))); } /** @@ -106,7 +107,7 @@ public void testThrottleResponsesAreCountedInMetrics() throws IOException { // Queue up some throttle responses final int numThrottles = randomIntBetween(1, MAX_RETRIES); IntStream.range(0, numThrottles) - .forEach(i -> requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS))); + .forEach(i -> requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS))); // Check that the blob exists blobContainer.blobExists(purpose, blobName); @@ -132,11 +133,7 @@ public void testRangeNotSatisfiedAreCountedInMetrics() throws IOException { // Queue up a range-not-satisfied error requestHandlers.offer( - new ResponseInjectingAzureHttpHandler.FixedRequestHandler( - RestStatus.REQUESTED_RANGE_NOT_SATISFIED, - null, - GET_BLOB_REQUEST_PREDICATE - ) + new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.REQUESTED_RANGE_NOT_SATISFIED, null, GET_BLOB_REQUEST_PREDICATE) ); // Attempt to read the blob @@ -169,7 +166,7 @@ public void testErrorResponsesAreCountedInMetrics() throws IOException { if (status == RestStatus.TOO_MANY_REQUESTS) { throttles.incrementAndGet(); } - requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(status)); + requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(status)); }); // Check that the blob exists @@ -265,7 +262,7 @@ public void testBatchDeleteFailure() throws IOException { clearMetrics(dataNodeName); // Handler will fail one or more of the batch requests - final ResponseInjectingAzureHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches); + final ResponseInjectingHttpHandler.RequestHandler failNRequestRequestHandler = createFailNRequestsHandler(failedBatches); // Exhaust the retries IntStream.range(0, (numberOfBatches - failedBatches) + (failedBatches * (MAX_RETRIES + 1))) @@ -308,6 +305,35 @@ private MetricsAsserter metricsAsserter( return new MetricsAsserter(dataNodeName, operationPurpose, operation, repository); } + /** + * Creates a {@link ResponseInjectingHttpHandler.RequestHandler} that will persistently fail the first numberToFail + * distinct requests it sees. Any other requests are passed through to the delegate. + * + * @param numberToFail The number of requests to fail + * @return the handler + */ + private static ResponseInjectingHttpHandler.RequestHandler createFailNRequestsHandler(int numberToFail) { + final List requestsToFail = new ArrayList<>(numberToFail); + return (exchange, delegate) -> { + final Headers requestHeaders = exchange.getRequestHeaders(); + final String requestId = requestHeaders.get("X-ms-client-request-id").get(0); + boolean failRequest = false; + synchronized (requestsToFail) { + if (requestsToFail.contains(requestId)) { + failRequest = true; + } else if (requestsToFail.size() < numberToFail) { + requestsToFail.add(requestId); + failRequest = true; + } + } + if (failRequest) { + exchange.sendResponseHeaders(500, -1); + } else { + delegate.handle(exchange); + } + }; + } + private class MetricsAsserter { private final String dataNodeName; private final OperationPurpose purpose; diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java index 8979507230bdd..56f7ee123a10f 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.http.ResponseInjectingHttpHandler; import org.elasticsearch.rest.RestStatus; import org.junit.Before; @@ -34,14 +35,14 @@ public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase { - private final Queue requestHandlers = new ConcurrentLinkedQueue<>(); + private final Queue requestHandlers = new ConcurrentLinkedQueue<>(); @SuppressForbidden(reason = "use a http server") @Before public void configureAzureHandler() { httpServer.createContext( "/", - new ResponseInjectingAzureHttpHandler( + new ResponseInjectingHttpHandler( requestHandlers, new AzureHttpHandler(ACCOUNT, CONTAINER, null, MockAzureBlobStore.LeaseExpiryPredicate.NEVER_EXPIRE) ) @@ -61,7 +62,7 @@ public void testRetriesAndOperationsAreTrackedSeparately() throws IOException { for (int i = 0; i < randomIntBetween(10, 50); i++) { final boolean triggerRetry = randomBoolean(); if (triggerRetry) { - requestHandlers.offer(new ResponseInjectingAzureHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)); + requestHandlers.offer(new ResponseInjectingHttpHandler.FixedRequestHandler(RestStatus.TOO_MANY_REQUESTS)); } final AzureBlobStore.Operation operation = randomFrom(supportedOperations); switch (operation) { diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 6505b7234966b..1adc380216529 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -268,7 +269,8 @@ protected GoogleCloudStorageBlobStore createBlobStore() { metadata.name(), storageService, bigArrays, - randomIntBetween(1, 8) * 1024 + randomIntBetween(1, 8) * 1024, + BackoffPolicy.noBackoff() ) { @Override long getLargeBlobThresholdInBytes() { diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 6284129c0825c..48192e9173ffa 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -41,6 +42,7 @@ import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Streams; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.rest.RestStatus; import java.io.ByteArrayInputStream; @@ -105,6 +107,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { private final GoogleCloudStorageOperationsStats stats; private final int bufferSize; private final BigArrays bigArrays; + private final BackoffPolicy casBackoffPolicy; GoogleCloudStorageBlobStore( String bucketName, @@ -112,7 +115,8 @@ class GoogleCloudStorageBlobStore implements BlobStore { String repositoryName, GoogleCloudStorageService storageService, BigArrays bigArrays, - int bufferSize + int bufferSize, + BackoffPolicy casBackoffPolicy ) { this.bucketName = bucketName; this.clientName = clientName; @@ -121,6 +125,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { this.bigArrays = bigArrays; this.stats = new GoogleCloudStorageOperationsStats(bucketName); this.bufferSize = bufferSize; + this.casBackoffPolicy = casBackoffPolicy; } private Storage client() throws IOException { @@ -691,28 +696,46 @@ OptionalBytesReference compareAndExchangeRegister( .setMd5(Base64.getEncoder().encodeToString(MessageDigests.digest(updated, MessageDigests.md5()))) .build(); final var bytesRef = updated.toBytesRef(); - try { - SocketAccess.doPrivilegedVoidIOException( - () -> client().create( - blobInfo, - bytesRef.bytes, - bytesRef.offset, - bytesRef.length, - Storage.BlobTargetOption.generationMatch() - ) - ); - } catch (Exception e) { - final var serviceException = unwrapServiceException(e); - if (serviceException != null) { + + final Iterator retries = casBackoffPolicy.iterator(); + BaseServiceException finalException = null; + while (true) { + try { + SocketAccess.doPrivilegedVoidIOException( + () -> client().create( + blobInfo, + bytesRef.bytes, + bytesRef.offset, + bytesRef.length, + Storage.BlobTargetOption.generationMatch() + ) + ); + return OptionalBytesReference.of(expected); + } catch (Exception e) { + final var serviceException = unwrapServiceException(e); + if (serviceException == null) { + throw e; + } final var statusCode = serviceException.getCode(); - if (statusCode == RestStatus.PRECONDITION_FAILED.getStatus() || statusCode == RestStatus.TOO_MANY_REQUESTS.getStatus()) { + if (statusCode == RestStatus.PRECONDITION_FAILED.getStatus()) { return OptionalBytesReference.MISSING; } + if (statusCode == RestStatus.TOO_MANY_REQUESTS.getStatus()) { + finalException = ExceptionsHelper.useOrSuppress(finalException, serviceException); + if (retries.hasNext()) { + try { + // noinspection BusyWait + Thread.sleep(retries.next().millis()); + } catch (InterruptedException iex) { + Thread.currentThread().interrupt(); + finalException.addSuppressed(iex); + } + } else { + throw finalException; + } + } } - throw e; } - - return OptionalBytesReference.of(expected); } private static BaseServiceException unwrapServiceException(Throwable t) { diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java index 36944e61d9c18..16233d3b391d7 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRepository.java @@ -13,12 +13,14 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository; @@ -56,10 +58,33 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { ); static final Setting CLIENT_NAME = Setting.simpleString("client", "default"); + /** + * We will retry CASes that fail due to throttling. We use an {@link BackoffPolicy#linearBackoff(TimeValue, int, TimeValue)} + * with the following parameters + */ + static final Setting RETRY_THROTTLED_CAS_DELAY_INCREMENT = Setting.timeSetting( + "throttled_cas_retry.delay_increment", + TimeValue.timeValueMillis(100), + TimeValue.ZERO + ); + static final Setting RETRY_THROTTLED_CAS_MAX_NUMBER_OF_RETRIES = Setting.intSetting( + "throttled_cas_retry.maximum_number_of_retries", + 2, + 0 + ); + static final Setting RETRY_THROTTLED_CAS_MAXIMUM_DELAY = Setting.timeSetting( + "throttled_cas_retry.maximum_delay", + TimeValue.timeValueSeconds(5), + TimeValue.ZERO + ); + private final GoogleCloudStorageService storageService; private final ByteSizeValue chunkSize; private final String bucket; private final String clientName; + private final TimeValue retryThrottledCasDelayIncrement; + private final int retryThrottledCasMaxNumberOfRetries; + private final TimeValue retryThrottledCasMaxDelay; GoogleCloudStorageRepository( final RepositoryMetadata metadata, @@ -83,6 +108,9 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository { this.chunkSize = getSetting(CHUNK_SIZE, metadata); this.bucket = getSetting(BUCKET, metadata); this.clientName = CLIENT_NAME.get(metadata.settings()); + this.retryThrottledCasDelayIncrement = RETRY_THROTTLED_CAS_DELAY_INCREMENT.get(metadata.settings()); + this.retryThrottledCasMaxNumberOfRetries = RETRY_THROTTLED_CAS_MAX_NUMBER_OF_RETRIES.get(metadata.settings()); + this.retryThrottledCasMaxDelay = RETRY_THROTTLED_CAS_MAXIMUM_DELAY.get(metadata.settings()); logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath(), chunkSize, isCompress()); } @@ -105,7 +133,15 @@ private static Map buildLocation(RepositoryMetadata metadata) { @Override protected GoogleCloudStorageBlobStore createBlobStore() { - return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bigArrays, bufferSize); + return new GoogleCloudStorageBlobStore( + bucket, + clientName, + metadata.name(), + storageService, + bigArrays, + bufferSize, + BackoffPolicy.linearBackoff(retryThrottledCasDelayIncrement, retryThrottledCasMaxNumberOfRetries, retryThrottledCasMaxDelay) + ); } @Override diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 5700fa6de63fa..96db51a060f4c 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.repositories.gcs; import fixture.gcs.FakeOAuth2HttpHandler; +import fixture.gcs.GoogleCloudStorageHttpHandler; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.http.HttpTransportOptions; @@ -18,10 +19,13 @@ import com.sun.net.httpserver.HttpHandler; import org.apache.http.HttpStatus; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.OptionalBytesReference; +import org.elasticsearch.common.blobstore.support.BlobContainerUtils; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; @@ -37,6 +41,7 @@ import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; +import org.elasticsearch.http.ResponseInjectingHttpHandler; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; @@ -55,6 +60,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -158,7 +165,8 @@ StorageOptions createStorageOptions( "repo", service, BigArrays.NON_RECYCLING_INSTANCE, - randomIntBetween(1, 8) * 1024 + randomIntBetween(1, 8) * 1024, + BackoffPolicy.linearBackoff(TimeValue.timeValueMillis(1), 3, TimeValue.timeValueSeconds(1)) ); return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore); @@ -463,6 +471,41 @@ public String next() { } } + public void testCompareAndExchangeWhenThrottled() throws IOException { + final Queue requestHandlers = new ConcurrentLinkedQueue<>(); + httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket"))); + + final int maxRetries = randomIntBetween(1, 3); + final BlobContainer container = createBlobContainer(maxRetries, null, null, null); + final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)); + final String key = randomIdentifier(); + + final OptionalBytesReference createResult = safeAwait( + l -> container.compareAndExchangeRegister(randomPurpose(), key, BytesArray.EMPTY, new BytesArray(data), l) + ); + assertEquals(createResult, OptionalBytesReference.EMPTY); + + final byte[] updatedData = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH)); + final int failuresToExhaustAttempts = maxRetries + 1; + final int numberOfThrottles = randomIntBetween(failuresToExhaustAttempts, (4 * failuresToExhaustAttempts) - 1); + for (int i = 0; i < numberOfThrottles; i++) { + requestHandlers.offer( + new ResponseInjectingHttpHandler.FixedRequestHandler( + RestStatus.TOO_MANY_REQUESTS, + null, + ex -> ex.getRequestURI().getPath().equals("/upload/storage/v1/b/bucket/o") && ex.getRequestMethod().equals("POST") + ) + ); + } + final OptionalBytesReference updateResult = safeAwait( + l -> container.compareAndExchangeRegister(randomPurpose(), key, new BytesArray(data), new BytesArray(updatedData), l) + ); + assertEquals(new BytesArray(data), updateResult.bytesReference()); + + assertEquals(0, requestHandlers.size()); + container.delete(randomPurpose()); + } + private HttpHandler safeHandler(HttpHandler handler) { final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger); return exchange -> { diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index 43724538aabea..81509c7f2183b 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -17,6 +17,7 @@ import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageException; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -87,7 +88,8 @@ public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Excepti "repo", storageService, BigArrays.NON_RECYCLING_INSTANCE, - randomIntBetween(1, 8) * 1024 + randomIntBetween(1, 8) * 1024, + BackoffPolicy.noBackoff() ) ) { final BlobContainer container = store.blobContainer(BlobPath.EMPTY); diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/ResponseInjectingAzureHttpHandler.java b/test/framework/src/main/java/org/elasticsearch/http/ResponseInjectingHttpHandler.java similarity index 57% rename from modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/ResponseInjectingAzureHttpHandler.java rename to test/framework/src/main/java/org/elasticsearch/http/ResponseInjectingHttpHandler.java index 108d8bc286972..d626401755eb2 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/ResponseInjectingAzureHttpHandler.java +++ b/test/framework/src/main/java/org/elasticsearch/http/ResponseInjectingHttpHandler.java @@ -7,9 +7,8 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.repositories.azure; +package org.elasticsearch.http; -import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -19,21 +18,18 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; import java.util.Queue; import java.util.function.Predicate; -@SuppressForbidden(reason = "we use a HttpServer to emulate Azure") -class ResponseInjectingAzureHttpHandler implements ESMockAPIBasedRepositoryIntegTestCase.DelegatingHttpHandler { +@SuppressForbidden(reason = "We use HttpServer for the fixtures") +public class ResponseInjectingHttpHandler implements ESMockAPIBasedRepositoryIntegTestCase.DelegatingHttpHandler { private final HttpHandler delegate; private final Queue requestHandlerQueue; - ResponseInjectingAzureHttpHandler(Queue requestHandlerQueue, HttpHandler delegate) { + public ResponseInjectingHttpHandler(Queue requestHandlerQueue, HttpHandler delegate) { this.delegate = delegate; this.requestHandlerQueue = requestHandlerQueue; - AzureBlobContainerStatsTests test = new AzureBlobContainerStatsTests(); } @Override @@ -51,38 +47,9 @@ public HttpHandler getDelegate() { return delegate; } - /** - * Creates a {@link ResponseInjectingAzureHttpHandler.RequestHandler} that will persistently fail the first numberToFail - * distinct requests it sees. Any other requests are passed through to the delegate. - * - * @param numberToFail The number of requests to fail - * @return the handler - */ - static ResponseInjectingAzureHttpHandler.RequestHandler createFailNRequestsHandler(int numberToFail) { - final List requestsToFail = new ArrayList<>(numberToFail); - return (exchange, delegate) -> { - final Headers requestHeaders = exchange.getRequestHeaders(); - final String requestId = requestHeaders.get("X-ms-client-request-id").get(0); - boolean failRequest = false; - synchronized (requestsToFail) { - if (requestsToFail.contains(requestId)) { - failRequest = true; - } else if (requestsToFail.size() < numberToFail) { - requestsToFail.add(requestId); - failRequest = true; - } - } - if (failRequest) { - exchange.sendResponseHeaders(500, -1); - } else { - delegate.handle(exchange); - } - }; - } - - @SuppressForbidden(reason = "we use a HttpServer to emulate Azure") + @SuppressForbidden(reason = "We use HttpServer for the fixtures") @FunctionalInterface - interface RequestHandler { + public interface RequestHandler { void writeResponse(HttpExchange exchange, HttpHandler delegate) throws IOException; default boolean matchesRequest(HttpExchange exchange) { @@ -90,14 +57,14 @@ default boolean matchesRequest(HttpExchange exchange) { } } - @SuppressForbidden(reason = "we use a HttpServer to emulate Azure") - static class FixedRequestHandler implements RequestHandler { + @SuppressForbidden(reason = "We use HttpServer for the fixtures") + public static class FixedRequestHandler implements RequestHandler { private final RestStatus status; private final String responseBody; private final Predicate requestMatcher; - FixedRequestHandler(RestStatus status) { + public FixedRequestHandler(RestStatus status) { this(status, null, req -> true); } @@ -106,7 +73,7 @@ static class FixedRequestHandler implements RequestHandler { * that because the errors are stored in a queue this will prevent any subsequently queued errors from * being returned until after it returns. */ - FixedRequestHandler(RestStatus status, String responseBody, Predicate requestMatcher) { + public FixedRequestHandler(RestStatus status, String responseBody, Predicate requestMatcher) { this.status = status; this.responseBody = responseBody; this.requestMatcher = requestMatcher; @@ -121,7 +88,7 @@ public boolean matchesRequest(HttpExchange exchange) { public void writeResponse(HttpExchange exchange, HttpHandler delegateHandler) throws IOException { if (responseBody != null) { byte[] responseBytes = responseBody.getBytes(StandardCharsets.UTF_8); - exchange.sendResponseHeaders(status.getStatus(), responseBytes.length); + exchange.sendResponseHeaders(status.getStatus(), responseBytes.length == 0 ? -1 : responseBytes.length); exchange.getResponseBody().write(responseBytes); } else { exchange.sendResponseHeaders(status.getStatus(), -1);