From 1151181638c1edb1d32f223236a6337f465c1d48 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Mon, 13 Jan 2025 11:29:07 +0100 Subject: [PATCH] Move methods into tests --- .../build/lib/remote/ByteStreamUploader.java | 53 ---------- .../lib/remote/ByteStreamUploaderTest.java | 98 ++++++++++++++----- 2 files changed, 74 insertions(+), 77 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index aea5383cb2a6e3..c7602543032709 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -16,8 +16,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction; -import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; -import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; @@ -50,9 +48,6 @@ import io.grpc.stub.ClientResponseObserver; import io.netty.util.ReferenceCounted; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; @@ -104,54 +99,6 @@ final class ByteStreamUploader { this.digestFunction = digestFunction; } - @VisibleForTesting - ReferenceCountedChannel getChannel() { - return channel; - } - - @VisibleForTesting - RemoteRetrier getRetrier() { - return retrier; - } - - /** - * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. - * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. - * - *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is - * transparent to the user of this API. - * - * @param digest the digest of the data to upload. - * @param chunker the data to upload. - * @throws IOException when reading of the {@link Chunker}s input source fails - */ - public void uploadBlob(RemoteActionExecutionContext context, Digest digest, Chunker chunker) - throws IOException, InterruptedException { - getFromFuture(uploadBlobAsync(context, digest, chunker)); - } - - /** - * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks - * until the upload of all BLOBs is complete, or throws an {@link - * com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors. - * - *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is - * transparent to the user of this API. - * - * @param chunkers the data to upload. - * @throws IOException when reading of the {@link Chunker}s input source or uploading fails - */ - public void uploadBlobs(RemoteActionExecutionContext context, Map chunkers) - throws IOException, InterruptedException { - List> uploads = new ArrayList<>(); - - for (Map.Entry chunkerEntry : chunkers.entrySet()) { - uploads.add(uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue())); - } - - waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true); - } - /** * Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns * immediately and one can listen to the returned future for the success/failure of the upload. diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index d0f95accb257ba..0986a00ce2ec19 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -15,6 +15,8 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.truth.Truth.assertThat; +import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; +import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; @@ -180,7 +182,7 @@ public void singleBlobUploadShouldWork() throws Exception { serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob)); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -243,7 +245,7 @@ public void onCompleted() {} } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -358,7 +360,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test triggers one retry. Mockito.verify(mockBackoff, Mockito.times(1)) @@ -474,7 +476,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) .isTrue(); @@ -541,7 +543,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -598,7 +600,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. assertThat(numWriteCalls.get()).isEqualTo(1); @@ -669,7 +671,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should have triggered a single retry, because it made // no progress. @@ -708,7 +710,7 @@ public StreamObserver write(StreamObserver streamOb } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -759,7 +761,7 @@ public void onCompleted() { }); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { // expected @@ -798,7 +800,7 @@ public StreamObserver write(StreamObserver streamOb } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -831,7 +833,7 @@ public void multipleBlobsUploadShouldWork() throws Exception { serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); } @Test @@ -859,7 +861,8 @@ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exceptio + " --bep_maximum_open_remote_upload_files flag to a number lower than your system" + " default (run 'ulimit -a' for *nix-based operating systems). Original error message:" + " Too many open files"; - assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker))) + assertThat( + assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker))) .hasMessageThat() .isEqualTo(newMessage); } @@ -901,7 +904,7 @@ public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAll serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles); } @@ -937,7 +940,7 @@ public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception { serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); assertThat(uploader.getOpenedFilePermits()).isNull(); } @@ -1134,7 +1137,7 @@ public ServerCall.Listener interceptCall( } })); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1165,7 +1168,7 @@ public StreamObserver write(StreamObserver response }); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue(); @@ -1207,7 +1210,7 @@ public StreamObserver write(StreamObserver response Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class); @@ -1255,7 +1258,7 @@ public void onCompleted() { Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1299,7 +1302,7 @@ public void onCompleted() { Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1334,7 +1337,7 @@ public StreamObserver write(StreamObserver response Digest digest = DIGEST_UTIL.compute(blob); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(numCalls.get()).isEqualTo(1); @@ -1389,7 +1392,7 @@ public StreamObserver write(StreamObserver streamOb } }); - assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker)); + assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker)); assertThat(refreshTimes.get()).isEqualTo(1); assertThat(numUploads.get()).isEqualTo(2); @@ -1474,7 +1477,7 @@ public void onCompleted() { } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); assertThat(refreshTimes.get()).isEqualTo(1); assertThat(numUploads.get()).isEqualTo(2); @@ -1540,7 +1543,7 @@ public void queryWriteStatus( Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); assertThat(numUploads.get()).isEqualTo(1); } @@ -1630,7 +1633,7 @@ public void onCompleted() { Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -1638,6 +1641,53 @@ public void onCompleted() { assertThat(numUploads.get()).isEqualTo(1); } + /** + * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. + * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. + * + *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. + * + * @param digest the digest of the data to upload. + * @param chunker the data to upload. + * @throws IOException when reading of the {@link Chunker}s input source fails + */ + private static void uploadBlob( + ByteStreamUploader byteStreamUploader, + RemoteActionExecutionContext context, + Digest digest, + Chunker chunker) + throws IOException, InterruptedException { + getFromFuture(byteStreamUploader.uploadBlobAsync(context, digest, chunker)); + } + + /** + * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks + * until the upload of all BLOBs is complete, or throws an {@link + * com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors. + * + *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. + * + * @param chunkers the data to upload. + * @throws IOException when reading of the {@link Chunker}s input source or uploading fails + */ + private static void uploadBlobs( + ByteStreamUploader byteStreamUploader, + RemoteActionExecutionContext context, + Map chunkers) + throws IOException, InterruptedException { + List> uploads = new ArrayList<>(); + + for (Map.Entry chunkerEntry : chunkers.entrySet()) { + uploads.add( + byteStreamUploader.uploadBlobAsync( + context, chunkerEntry.getKey(), chunkerEntry.getValue())); + } + + waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true); + } + private static class NoopStreamObserver implements StreamObserver { @Override public void onNext(WriteRequest writeRequest) {}