Skip to content

Commit

Permalink
Move methods into tests
Browse files Browse the repository at this point in the history
  • Loading branch information
fmeum committed Jan 13, 2025
1 parent 560b5b2 commit 1151181
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -50,9 +48,6 @@
import io.grpc.stub.ClientResponseObserver;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -104,54 +99,6 @@ final class ByteStreamUploader {
this.digestFunction = digestFunction;
}

@VisibleForTesting
ReferenceCountedChannel getChannel() {
return channel;
}

@VisibleForTesting
RemoteRetrier getRetrier() {
return retrier;
}

/**
* Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service.
* The call blocks until the upload is complete, or throws an {@link Exception} in case of error.
*
* <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
* transparent to the user of this API.
*
* @param digest the digest of the data to upload.
* @param chunker the data to upload.
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
public void uploadBlob(RemoteActionExecutionContext context, Digest digest, Chunker chunker)
throws IOException, InterruptedException {
getFromFuture(uploadBlobAsync(context, digest, chunker));
}

/**
* Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks
* until the upload of all BLOBs is complete, or throws an {@link
* com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors.
*
* <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
* transparent to the user of this API.
*
* @param chunkers the data to upload.
* @throws IOException when reading of the {@link Chunker}s input source or uploading fails
*/
public void uploadBlobs(RemoteActionExecutionContext context, Map<Digest, Chunker> chunkers)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();

for (Map.Entry<Digest, Chunker> chunkerEntry : chunkers.entrySet()) {
uploads.add(uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue()));
}

waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true);
}

/**
* Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns
* immediately and one can listen to the returned future for the success/failure of the upload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -180,7 +182,7 @@ public void singleBlobUploadShouldWork() throws Exception {

serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob));

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
Expand Down Expand Up @@ -243,7 +245,7 @@ public void onCompleted() {}
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
Expand Down Expand Up @@ -358,7 +360,7 @@ public void queryWriteStatus(
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

// This test triggers one retry.
Mockito.verify(mockBackoff, Mockito.times(1))
Expand Down Expand Up @@ -474,7 +476,7 @@ public void queryWriteStatus(
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize);
assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length))
.isTrue();
Expand Down Expand Up @@ -541,7 +543,7 @@ public void queryWriteStatus(
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
}

@Test
Expand Down Expand Up @@ -598,7 +600,7 @@ public void queryWriteStatus(
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

// This test should not have triggered any retries.
assertThat(numWriteCalls.get()).isEqualTo(1);
Expand Down Expand Up @@ -669,7 +671,7 @@ public void queryWriteStatus(
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

// This test should have triggered a single retry, because it made
// no progress.
Expand Down Expand Up @@ -708,7 +710,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);
Expand Down Expand Up @@ -759,7 +761,7 @@ public void onCompleted() {
});

try {
uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
// expected
Expand Down Expand Up @@ -798,7 +800,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
}

@Test
Expand Down Expand Up @@ -831,7 +833,7 @@ public void multipleBlobsUploadShouldWork() throws Exception {

serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));

uploader.uploadBlobs(context, chunkers);
uploadBlobs(uploader, context, chunkers);
}

@Test
Expand Down Expand Up @@ -859,7 +861,8 @@ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exceptio
+ " --bep_maximum_open_remote_upload_files flag to a number lower than your system"
+ " default (run 'ulimit -a' for *nix-based operating systems). Original error message:"
+ " Too many open files";
assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker)))
assertThat(
assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker)))
.hasMessageThat()
.isEqualTo(newMessage);
}
Expand Down Expand Up @@ -901,7 +904,7 @@ public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAll

serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));

uploader.uploadBlobs(context, chunkers);
uploadBlobs(uploader, context, chunkers);

assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles);
}
Expand Down Expand Up @@ -937,7 +940,7 @@ public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception {

serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash));

uploader.uploadBlobs(context, chunkers);
uploadBlobs(uploader, context, chunkers);
assertThat(uploader.getOpenedFilePermits()).isNull();
}

Expand Down Expand Up @@ -1134,7 +1137,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
}
}));

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
}

@Test
Expand Down Expand Up @@ -1165,7 +1168,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response
});

try {
uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue();
Expand Down Expand Up @@ -1207,7 +1210,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);
try {
uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
Expand Down Expand Up @@ -1255,7 +1258,7 @@ public void onCompleted() {
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
}

@Test
Expand Down Expand Up @@ -1299,7 +1302,7 @@ public void onCompleted() {
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
}

@Test
Expand Down Expand Up @@ -1334,7 +1337,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> response
Digest digest = DIGEST_UTIL.compute(blob);

try {
uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);
fail("Should have thrown an exception.");
} catch (IOException e) {
assertThat(numCalls.get()).isEqualTo(1);
Expand Down Expand Up @@ -1389,7 +1392,7 @@ public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamOb
}
});

assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker));
assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker));

assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
Expand Down Expand Up @@ -1474,7 +1477,7 @@ public void onCompleted() {
}
});

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

assertThat(refreshTimes.get()).isEqualTo(1);
assertThat(numUploads.get()).isEqualTo(2);
Expand Down Expand Up @@ -1540,7 +1543,7 @@ public void queryWriteStatus(
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

assertThat(numUploads.get()).isEqualTo(1);
}
Expand Down Expand Up @@ -1630,14 +1633,61 @@ public void onCompleted() {
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
Digest digest = DIGEST_UTIL.compute(blob);

uploader.uploadBlob(context, digest, chunker);
uploadBlob(uploader, context, digest, chunker);

// This test should not have triggered any retries.
Mockito.verifyNoInteractions(mockBackoff);

assertThat(numUploads.get()).isEqualTo(1);
}

/**
* Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service.
* The call blocks until the upload is complete, or throws an {@link Exception} in case of error.
*
* <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
* transparent to the user of this API.
*
* @param digest the digest of the data to upload.
* @param chunker the data to upload.
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
private static void uploadBlob(
ByteStreamUploader byteStreamUploader,
RemoteActionExecutionContext context,
Digest digest,
Chunker chunker)
throws IOException, InterruptedException {
getFromFuture(byteStreamUploader.uploadBlobAsync(context, digest, chunker));
}

/**
* Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks
* until the upload of all BLOBs is complete, or throws an {@link
* com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors.
*
* <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
* transparent to the user of this API.
*
* @param chunkers the data to upload.
* @throws IOException when reading of the {@link Chunker}s input source or uploading fails
*/
private static void uploadBlobs(
ByteStreamUploader byteStreamUploader,
RemoteActionExecutionContext context,
Map<Digest, Chunker> chunkers)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();

for (Map.Entry<Digest, Chunker> chunkerEntry : chunkers.entrySet()) {
uploads.add(
byteStreamUploader.uploadBlobAsync(
context, chunkerEntry.getKey(), chunkerEntry.getValue()));
}

waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true);
}

private static class NoopStreamObserver implements StreamObserver<WriteRequest> {
@Override
public void onNext(WriteRequest writeRequest) {}
Expand Down

0 comments on commit 1151181

Please sign in to comment.