Skip to content

Commit

Permalink
Make Chunker closeable
Browse files Browse the repository at this point in the history
  • Loading branch information
fmeum committed Jan 13, 2025
1 parent 1151181 commit ada9cdb
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ final class ByteStreamUploader {
* performed. This is transparent to the user of this API.
*
* @param digest the {@link Digest} of the data to upload.
* @param chunker the data to upload.
* @param chunker the data to upload. Callers are responsible for closing the {@link Chunker}.
*/
public ListenableFuture<Void> uploadBlobAsync(
RemoteActionExecutionContext context, Digest digest, Chunker chunker) {
Expand Down
37 changes: 17 additions & 20 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
Expand All @@ -44,7 +43,7 @@
*
* <p>This class should not be extended - it's only non-final for testing.
*/
public class Chunker {
public class Chunker implements AutoCloseable {

private static int defaultChunkSize = 1024 * 16;

Expand Down Expand Up @@ -98,7 +97,7 @@ public int hashCode() {
}
}

private final CloseableBlobSupplier dataSupplier;
private final Blob blob;
private final long uncompressedSize;
private final int chunkSize;
private final Chunk emptyChunk;
Expand All @@ -113,12 +112,8 @@ public int hashCode() {
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
private boolean initialized;

Chunker(
CloseableBlobSupplier dataSupplier,
long uncompressedSize,
int chunkSize,
boolean compressed) {
this.dataSupplier = checkNotNull(dataSupplier);
Chunker(Blob blob, long uncompressedSize, int chunkSize, boolean compressed) {
this.blob = checkNotNull(blob);
this.uncompressedSize = uncompressedSize;
this.chunkSize = chunkSize;
this.emptyChunk = new Chunk(ByteString.EMPTY, 0);
Expand All @@ -139,7 +134,7 @@ public long getUncompressedSize() {
* <p>Closes any open resources (file handles, ...).
*/
public void reset() throws IOException {
close();
closeInput();
offset = 0;
initialized = false;
}
Expand All @@ -164,7 +159,7 @@ public void seek(long toOffset) throws IOException {
initialize(toOffset);
}
if (uncompressedSize > 0 && data.finished()) {
close();
closeInput();
}
}

Expand All @@ -176,16 +171,18 @@ public boolean hasNext() {
}

/** Closes the input stream and reset chunk cache */
private void close() throws IOException {
private void closeInput() throws IOException {
if (data != null) {
data.close();
data = null;
}
chunkCache = null;
}

public void closeQuietly() {
dataSupplier.close();
@Override
public void close() throws IOException {
reset();
blob.close();
}

/** Attempts reading at most a full chunk and stores it in the chunkCache buffer */
Expand Down Expand Up @@ -217,7 +214,7 @@ public Chunk next() throws IOException {
maybeInitialize();

if (uncompressedSize == 0) {
close();
closeInput();
return emptyChunk;
}

Expand Down Expand Up @@ -249,7 +246,7 @@ public Chunk next() throws IOException {
// or the guard in getActualSize won't work.
offset += bytesRead;
if (data.finished()) {
close();
closeInput();
}

return new Chunk(blob, offsetBefore);
Expand All @@ -268,7 +265,7 @@ private void initialize(long srcPos) throws IOException {
checkState(offset == 0);
checkState(chunkCache == null);
try {
var src = dataSupplier.get();
var src = blob.get();
ByteStreams.skipFully(src, srcPos);
data =
compressed
Expand All @@ -294,10 +291,10 @@ public static class Builder {
private int chunkSize = getDefaultChunkSize();
protected long size;
private boolean compressed;
protected CloseableBlobSupplier inputStream;
protected Blob inputStream;

@CanIgnoreReturnValue
public Builder setInput(long size, CloseableBlobSupplier in) {
public Builder setInput(long size, Blob in) {
checkState(inputStream == null);
checkNotNull(in);
this.size = size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ private void releaseOut() {

@Override
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data) {
RemoteActionExecutionContext context, Digest digest, Blob data) {
return uploadChunker(
context,
digest,
Expand All @@ -500,11 +500,10 @@ ListenableFuture<Void> uploadChunker(
f.addListener(
() -> {
try {
chunker.reset();
chunker.closeQuietly();
chunker.close();
} catch (IOException e) {
logger.atWarning().withCause(e).log(
"failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes());
"failed to close chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes());
}
},
MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.disk.DiskCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource;
Expand Down Expand Up @@ -177,11 +177,13 @@ public void ensureInputsPresent(
}
}

private static final class VirtualActionInputDataSupplier implements CloseableBlobSupplier {
private static final class VirtualActionInputBlob implements Blob {
private VirtualActionInput virtualActionInput;
// Can be large compared to the retained size of the VirtualActionInput and thus shouldn't be
// kept in memory for an extended period of time.
private volatile ByteString data;

VirtualActionInputDataSupplier(VirtualActionInput virtualActionInput) {
VirtualActionInputBlob(VirtualActionInput virtualActionInput) {
this.virtualActionInput = Preconditions.checkNotNull(virtualActionInput);
}

Expand Down Expand Up @@ -220,7 +222,7 @@ private ListenableFuture<Void> uploadBlob(
return switch (file) {
case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) ->
remoteCacheClient.uploadBlob(
context, digest, new VirtualActionInputDataSupplier(virtualActionInput));
context, digest, new VirtualActionInputBlob(virtualActionInput));
case ContentSource.PathSource(Path path) -> {
try {
if (remotePathChecker.isRemote(context, path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/exec:spawn_input_expander",
"//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//third_party:guava",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import build.bazel.remote.execution.v2.ServerCapabilities;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -113,9 +113,16 @@ ListenableFuture<Void> uploadActionResult(
ListenableFuture<Void> downloadBlob(
RemoteActionExecutionContext context, Digest digest, OutputStream out);

/**
* A supplier for the data comprising a BLOB.
*
* <p>As blobs can be large and may need to be kept in memory, consumers should call {@link #get}
* as late as possible and close the blob as soon as they are done with it.
*/
@FunctionalInterface
interface CloseableBlobSupplier extends SilentCloseable {
InputStream get();
interface Blob extends Closeable {
/** Get an input stream for the blob's data. Can be called multiple times. */
InputStream get() throws IOException;

@Override
default void close() {}
Expand All @@ -129,11 +136,10 @@ default void close() {}
* @param data A supplier for the data to upload. May be called multiple times.
* @return A future representing pending completion of the upload.
*/
ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data);
ListenableFuture<Void> uploadBlob(RemoteActionExecutionContext context, Digest digest, Blob data);

/**
* Uploads a {@code file} to the CAS.
* Uploads a {@code file} BLOB to the CAS.
*
* @param context the context for the action.
* @param digest The digest of the file.
Expand All @@ -146,7 +152,7 @@ default ListenableFuture<Void> uploadFile(
}

/**
* Uploads a BLOB to the CAS.
* Uploads an in-memory BLOB to the CAS.
*
* @param context the context for the action.
* @param digest The digest of the blob.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture<V

@Override
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier in) {
RemoteActionExecutionContext context, Digest digest, Blob in) {
return retrier.executeAsync(
() -> {
var result =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
public class MerkleTree {
private static final String BAZEL_TOOL_INPUT_MARKER = "bazel_tool_input";

/** A source of a file's content */
public sealed interface ContentSource {
record PathSource(Path path) implements ContentSource {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import com.google.devtools.build.lib.analysis.BlazeVersionInfo;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TestUtils;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
Expand Down Expand Up @@ -1815,7 +1815,7 @@ public void queryWriteStatus(
/* Custom Chunker used to track number of open files */
private static class TestChunker extends Chunker {

TestChunker(CloseableBlobSupplier dataSupplier, long size, int chunkSize, boolean compressed) {
TestChunker(Blob dataSupplier, long size, int chunkSize, boolean compressed) {
super(dataSupplier, size, chunkSize, compressed);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void resourcesShouldBeReleased() throws IOException {

byte[] data = new byte[] {1, 2};
final AtomicReference<InputStream> in = new AtomicReference<>();
RemoteCacheClient.CloseableBlobSupplier supplier =
RemoteCacheClient.Blob supplier =
() -> {
in.set(Mockito.spy(new ByteArrayInputStream(data)));
return in.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import com.google.devtools.build.lib.remote.common.LostInputsEvent;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
Expand All @@ -74,7 +74,6 @@
import com.google.devtools.common.options.Options;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Deque;
Expand All @@ -90,7 +89,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -400,7 +398,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl
return future;
})
.when(cacheProtocol)
.uploadBlob(any(), any(), (CloseableBlobSupplier) any());
.uploadBlob(any(), any(), (Blob) any());
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
Expand Down Expand Up @@ -475,7 +473,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl
return future;
})
.when(cacheProtocol)
.uploadBlob(any(), any(), (CloseableBlobSupplier) any());
.uploadBlob(any(), any(), (Blob) any());
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
Expand Down Expand Up @@ -556,7 +554,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl
return future;
})
.when(cacheProtocol)
.uploadBlob(any(), any(), (CloseableBlobSupplier) any());
.uploadBlob(any(), any(), (Blob) any());
doAnswer(
invocationOnMock -> {
Path file = invocationOnMock.getArgument(2, Path.class);
Expand Down Expand Up @@ -655,7 +653,7 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed")))
.when(cacheProtocol)
.uploadBlob(any(), any(), (CloseableBlobSupplier) any());
.uploadBlob(any(), any(), (Blob) any());
doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed")))
.when(cacheProtocol)
.uploadFile(any(), any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@
import com.google.devtools.build.lib.remote.RemoteScrubbing.Config;
import com.google.devtools.build.lib.remote.common.BulkTransferException;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
import com.google.devtools.build.lib.remote.common.RemotePathResolver;
import com.google.devtools.build.lib.remote.common.RemotePathResolver.DefaultRemotePathResolver;
Expand Down Expand Up @@ -2163,7 +2163,7 @@ public void uploadInputsIfNotPresent_interrupted_requestCancelled() throws Excep
return future;
})
.when(cache.remoteCacheClient)
.uploadBlob(any(), any(), (CloseableBlobSupplier) any());
.uploadBlob(any(), any(), (Blob) any());
ActionInput input = ActionInputHelper.fromPath("inputs/foo");
fakeFileCache.createScratchInput(input, "input-foo");
RemoteExecutionService service = newRemoteExecutionService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public ListenableFuture<Void> uploadActionResult(

@Override
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data) {
RemoteActionExecutionContext context, Digest digest, Blob data) {
try {
cas.put(digest, data.get().readAllBytes());
} catch (IOException e) {
Expand Down
Loading

0 comments on commit ada9cdb

Please sign in to comment.