diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index aea5383cb2a6e3..c7602543032709 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -16,8 +16,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction;
-import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
-import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -50,9 +48,6 @@
import io.grpc.stub.ClientResponseObserver;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;
@@ -104,54 +99,6 @@ final class ByteStreamUploader {
this.digestFunction = digestFunction;
}
- @VisibleForTesting
- ReferenceCountedChannel getChannel() {
- return channel;
- }
-
- @VisibleForTesting
- RemoteRetrier getRetrier() {
- return retrier;
- }
-
- /**
- * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service.
- * The call blocks until the upload is complete, or throws an {@link Exception} in case of error.
- *
- *
Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
- * transparent to the user of this API.
- *
- * @param digest the digest of the data to upload.
- * @param chunker the data to upload.
- * @throws IOException when reading of the {@link Chunker}s input source fails
- */
- public void uploadBlob(RemoteActionExecutionContext context, Digest digest, Chunker chunker)
- throws IOException, InterruptedException {
- getFromFuture(uploadBlobAsync(context, digest, chunker));
- }
-
- /**
- * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks
- * until the upload of all BLOBs is complete, or throws an {@link
- * com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors.
- *
- *
Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
- * transparent to the user of this API.
- *
- * @param chunkers the data to upload.
- * @throws IOException when reading of the {@link Chunker}s input source or uploading fails
- */
- public void uploadBlobs(RemoteActionExecutionContext context, Map chunkers)
- throws IOException, InterruptedException {
- List> uploads = new ArrayList<>();
-
- for (Map.Entry chunkerEntry : chunkers.entrySet()) {
- uploads.add(uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue()));
- }
-
- waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true);
- }
-
/**
* Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns
* immediately and one can listen to the returned future for the success/failure of the upload.
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index d0f95accb257ba..0986a00ce2ec19 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -15,6 +15,8 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
+import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
+import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
@@ -180,7 +182,7 @@ public void singleBlobUploadShouldWork() throws Exception {
serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
@@ -243,7 +245,7 @@ public void onCompleted() {}
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
@@ -358,7 +360,7 @@ public void queryWriteStatus(
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
// This test triggers one retry.
Mockito.verify(mockBackoff, Mockito.times(1))
@@ -474,7 +476,7 @@ public void queryWriteStatus(
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize);
assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length))
.isTrue();
@@ -541,7 +543,7 @@ public void queryWriteStatus(
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
}
@Test
@@ -598,7 +600,7 @@ public void queryWriteStatus(
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
// This test should not have triggered any retries.
assertThat(numWriteCalls.get()).isEqualTo(1);
@@ -669,7 +671,7 @@ public void queryWriteStatus(
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
// This test should have triggered a single retry, because it made
// no progress.
@@ -708,7 +710,7 @@ public StreamObserver write(StreamObserver streamOb
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
@@ -759,7 +761,7 @@ public void onCompleted() {
});
try {
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
// expected
@@ -798,7 +800,7 @@ public StreamObserver write(StreamObserver streamOb
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
}
@Test
@@ -831,7 +833,7 @@ public void multipleBlobsUploadShouldWork() throws Exception {
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- uploader.uploadBlobs(context, chunkers);
+ uploadBlobs(uploader, context, chunkers);
}
@Test
@@ -859,7 +861,8 @@ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exceptio
+ " --bep_maximum_open_remote_upload_files flag to a number lower than your system"
+ " default (run 'ulimit -a' for *nix-based operating systems). Original error message:"
+ " Too many open files";
- assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker)))
+ assertThat(
+ assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker)))
.hasMessageThat()
.isEqualTo(newMessage);
}
@@ -901,7 +904,7 @@ public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAll
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- uploader.uploadBlobs(context, chunkers);
+ uploadBlobs(uploader, context, chunkers);
assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
}
@@ -937,7 +940,7 @@ public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception {
serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));
- uploader.uploadBlobs(context, chunkers);
+ uploadBlobs(uploader, context, chunkers);
assertThat(uploader.getOpenedFilePermits()).isNull();
}
@@ -1134,7 +1137,7 @@ public ServerCall.Listener interceptCall(
}
}));
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
}
@Test
@@ -1165,7 +1168,7 @@ public StreamObserver write(StreamObserver response
});
try {
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
@@ -1207,7 +1210,7 @@ public StreamObserver write(StreamObserver response
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);
try {
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -1255,7 +1258,7 @@ public void onCompleted() {
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
}
@Test
@@ -1299,7 +1302,7 @@ public void onCompleted() {
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
}
@Test
@@ -1334,7 +1337,7 @@ public StreamObserver write(StreamObserver response
Digest digest = DIGEST_UTIL.compute(blob);
try {
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(numCalls.get()).isEqualTo(1);
@@ -1389,7 +1392,7 @@ public StreamObserver write(StreamObserver streamOb
}
});
- assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker));
+ assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker));
assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
@@ -1474,7 +1477,7 @@ public void onCompleted() {
}
});
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
@@ -1540,7 +1543,7 @@ public void queryWriteStatus(
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
assertThat(numUploads.get()).isEqualTo(1);
}
@@ -1630,7 +1633,7 @@ public void onCompleted() {
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);
- uploader.uploadBlob(context, digest, chunker);
+ uploadBlob(uploader, context, digest, chunker);
// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
@@ -1638,6 +1641,53 @@ public void onCompleted() {
assertThat(numUploads.get()).isEqualTo(1);
}
+ /**
+ * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service.
+ * The call blocks until the upload is complete, or throws an {@link Exception} in case of error.
+ *
+ * Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
+ * transparent to the user of this API.
+ *
+ * @param digest the digest of the data to upload.
+ * @param chunker the data to upload.
+ * @throws IOException when reading of the {@link Chunker}s input source fails
+ */
+ private static void uploadBlob(
+ ByteStreamUploader byteStreamUploader,
+ RemoteActionExecutionContext context,
+ Digest digest,
+ Chunker chunker)
+ throws IOException, InterruptedException {
+ getFromFuture(byteStreamUploader.uploadBlobAsync(context, digest, chunker));
+ }
+
+ /**
+ * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks
+ * until the upload of all BLOBs is complete, or throws an {@link
+ * com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors.
+ *
+ *
Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
+ * transparent to the user of this API.
+ *
+ * @param chunkers the data to upload.
+ * @throws IOException when reading of the {@link Chunker}s input source or uploading fails
+ */
+ private static void uploadBlobs(
+ ByteStreamUploader byteStreamUploader,
+ RemoteActionExecutionContext context,
+ Map chunkers)
+ throws IOException, InterruptedException {
+ List> uploads = new ArrayList<>();
+
+ for (Map.Entry chunkerEntry : chunkers.entrySet()) {
+ uploads.add(
+ byteStreamUploader.uploadBlobAsync(
+ context, chunkerEntry.getKey(), chunkerEntry.getValue()));
+ }
+
+ waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true);
+ }
+
private static class NoopStreamObserver implements StreamObserver {
@Override
public void onNext(WriteRequest writeRequest) {}