Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce retention and early materialization of VirtualActionInput #24891

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,6 @@ public byte[] atomicallyWriteTo(Path outputPath) throws IOException {
return super.atomicallyWriteTo(outputPath);
}

@Override
public ByteString getBytes() throws IOException {
ByteString.Output out = ByteString.newOutput();
writeTo(out);
return out.toByteString();
}

@Override
public String getExecPathString() {
return paramFileExecPath.getPathString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,20 @@ protected byte[] writeTo(Path target) throws IOException {
/**
* Gets a {@link ByteString} representation of the fake file. Used to avoid copying if the fake
* file is internally represented as a {@link ByteString}.
*
* <p>Prefer {@link #writeTo} to this method to avoid materializing the entire file in memory. The
* return value should not be retained.
*/
public abstract ByteString getBytes() throws IOException;
public ByteString getBytes() {
ByteString.Output out = ByteString.newOutput();
try {
writeTo(out);
} catch (IOException e) {
// ByteString.Output doesn't throw IOExceptions.
throw new IllegalStateException(e);
}
return out.toByteString();
}

/**
* Returns the metadata for this input if available. Null otherwise.
Expand Down Expand Up @@ -174,7 +186,7 @@ public void writeTo(OutputStream out) throws IOException {
}

@Override
public ByteString getBytes() throws IOException {
public ByteString getBytes() {
return ByteString.EMPTY;
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/google/devtools/build/lib/exec/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/profiler",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/util/io",
"//src/main/java/com/google/devtools/build/lib/util/io:io-proto",
"//src/main/java/com/google/devtools/build/lib/vfs",
Expand All @@ -298,6 +299,7 @@ java_library(
"//third_party:jsr305",
"@com_google_protobuf//:protobuf_java",
"@com_google_protobuf//:protobuf_java_util",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
"@zstd-jni",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,6 @@ protected byte[] atomicallyWriteTo(Path outputPath) throws IOException {
return digest;
}

@Override
public ByteString getBytes() throws IOException {
ByteString.Output out = ByteString.newOutput();
writeTo(out);
return out.toByteString();
}

@Override
public FileArtifactValue getMetadata() throws IOException {
// We intentionally delay hashing until it is necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.devtools.build.lib.exec.Protos.EnvironmentVariable;
import com.google.devtools.build.lib.exec.Protos.Platform;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.vfs.DigestHashFunction;
import com.google.devtools.build.lib.vfs.DigestUtils;
import com.google.devtools.build.lib.vfs.FileStatus;
Expand Down Expand Up @@ -167,11 +168,9 @@ protected Digest computeDigest(

if (input != null) {
if (input instanceof VirtualActionInput virtualActionInput) {
byte[] blob = virtualActionInput.getBytes().toByteArray();
return builder
.setHash(digestHashFunction.getHashFunction().hashBytes(blob).toString())
.setSizeBytes(blob.length)
.build();
build.bazel.remote.execution.v2.Digest digest =
DigestUtil.compute(virtualActionInput, digestHashFunction.getHashFunction());
return builder.setHash(digest.getHash()).setSizeBytes(digest.getSizeBytes()).build();
}

// Try to obtain a digest from the input metadata.
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/merkletree",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils",
"//src/main/java/com/google/devtools/build/lib/remote/zstd",
"//src/main/java/com/google/devtools/build/lib/skyframe:action_execution_value",
"//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -50,9 +48,6 @@
import io.grpc.stub.ClientResponseObserver;
import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -104,54 +99,6 @@ final class ByteStreamUploader {
this.digestFunction = digestFunction;
}

@VisibleForTesting
ReferenceCountedChannel getChannel() {
return channel;
}

@VisibleForTesting
RemoteRetrier getRetrier() {
return retrier;
}

/**
* Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service.
* The call blocks until the upload is complete, or throws an {@link Exception} in case of error.
*
* <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
* transparent to the user of this API.
*
* @param digest the digest of the data to upload.
* @param chunker the data to upload.
* @throws IOException when reading of the {@link Chunker}s input source fails
*/
public void uploadBlob(RemoteActionExecutionContext context, Digest digest, Chunker chunker)
throws IOException, InterruptedException {
getFromFuture(uploadBlobAsync(context, digest, chunker));
}

/**
* Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks
* until the upload of all BLOBs is complete, or throws an {@link
* com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors.
*
* <p>Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is
* transparent to the user of this API.
*
* @param chunkers the data to upload.
* @throws IOException when reading of the {@link Chunker}s input source or uploading fails
*/
public void uploadBlobs(RemoteActionExecutionContext context, Map<Digest, Chunker> chunkers)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();

for (Map.Entry<Digest, Chunker> chunkerEntry : chunkers.entrySet()) {
uploads.add(uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue()));
}

waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true);
}

/**
* Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns
* immediately and one can listen to the returned future for the success/failure of the upload.
Expand All @@ -163,7 +110,7 @@ public void uploadBlobs(RemoteActionExecutionContext context, Map<Digest, Chunke
* performed. This is transparent to the user of this API.
*
* @param digest the {@link Digest} of the data to upload.
* @param chunker the data to upload.
* @param chunker the data to upload. Callers are responsible for closing the {@link Chunker}.
*/
public ListenableFuture<Void> uploadBlobAsync(
RemoteActionExecutionContext context, Digest digest, Chunker chunker) {
Expand Down
77 changes: 24 additions & 53 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob;
import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream;
import com.google.devtools.build.lib.vfs.Path;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
Expand All @@ -46,7 +43,7 @@
*
* <p>This class should not be extended - it's only non-final for testing.
*/
public class Chunker {
public class Chunker implements AutoCloseable {

private static int defaultChunkSize = 1024 * 16;

Expand Down Expand Up @@ -100,12 +97,7 @@ public int hashCode() {
}
}

/** A supplier that provide data as {@link InputStream}. */
public interface ChunkDataSupplier {
InputStream get() throws IOException;
}

private final ChunkDataSupplier dataSupplier;
private final Blob blob;
private final long uncompressedSize;
private final int chunkSize;
private final Chunk emptyChunk;
Expand All @@ -120,9 +112,8 @@ public interface ChunkDataSupplier {
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
private boolean initialized;

Chunker(
ChunkDataSupplier dataSupplier, long uncompressedSize, int chunkSize, boolean compressed) {
this.dataSupplier = checkNotNull(dataSupplier);
Chunker(Blob blob, long uncompressedSize, int chunkSize, boolean compressed) {
this.blob = checkNotNull(blob);
this.uncompressedSize = uncompressedSize;
this.chunkSize = chunkSize;
this.emptyChunk = new Chunk(ByteString.EMPTY, 0);
Expand All @@ -143,7 +134,7 @@ public long getUncompressedSize() {
* <p>Closes any open resources (file handles, ...).
*/
public void reset() throws IOException {
close();
closeInput();
offset = 0;
initialized = false;
}
Expand All @@ -168,7 +159,7 @@ public void seek(long toOffset) throws IOException {
initialize(toOffset);
}
if (uncompressedSize > 0 && data.finished()) {
close();
closeInput();
}
}

Expand All @@ -180,14 +171,20 @@ public boolean hasNext() {
}

/** Closes the input stream and reset chunk cache */
private void close() throws IOException {
private void closeInput() throws IOException {
if (data != null) {
data.close();
data = null;
}
chunkCache = null;
}

@Override
public void close() throws IOException {
reset();
blob.close();
}

/** Attempts reading at most a full chunk and stores it in the chunkCache buffer */
private int read() throws IOException {
int count = 0;
Expand Down Expand Up @@ -217,7 +214,7 @@ public Chunk next() throws IOException {
maybeInitialize();

if (uncompressedSize == 0) {
close();
closeInput();
return emptyChunk;
}

Expand Down Expand Up @@ -249,7 +246,7 @@ public Chunk next() throws IOException {
// or the guard in getActualSize won't work.
offset += bytesRead;
if (data.finished()) {
close();
closeInput();
}

return new Chunk(blob, offsetBefore);
Expand All @@ -268,7 +265,7 @@ private void initialize(long srcPos) throws IOException {
checkState(offset == 0);
checkState(chunkCache == null);
try {
var src = dataSupplier.get();
var src = blob.get();
ByteStreams.skipFully(src, srcPos);
data =
compressed
Expand All @@ -294,49 +291,23 @@ public static class Builder {
private int chunkSize = getDefaultChunkSize();
protected long size;
private boolean compressed;
protected ChunkDataSupplier inputStream;

@CanIgnoreReturnValue
public Builder setInput(byte[] data) {
checkState(inputStream == null);
size = data.length;
setInputSupplier(() -> new ByteArrayInputStream(data));
return this;
}
protected Blob inputStream;

@CanIgnoreReturnValue
public Builder setInput(long size, InputStream in) {
public Builder setInput(long size, Blob in) {
checkState(inputStream == null);
checkNotNull(in);
this.size = size;
inputStream = () -> in;
return this;
}

@CanIgnoreReturnValue
public Builder setInput(long size, Path file) {
checkState(inputStream == null);
this.size = size;
inputStream = file::getInputStream;
return this;
}

@CanIgnoreReturnValue
public Builder setInput(long size, ActionInput actionInput, Path execRoot) {
checkState(inputStream == null);
this.size = size;
if (actionInput instanceof VirtualActionInput virtualActionInput) {
inputStream = () -> virtualActionInput.getBytes().newInput();
} else {
inputStream = () -> ActionInputHelper.toInputPath(actionInput, execRoot).getInputStream();
}
inputStream = in;
return this;
}

@CanIgnoreReturnValue
@VisibleForTesting
protected final Builder setInputSupplier(ChunkDataSupplier inputStream) {
this.inputStream = inputStream;
public Builder setInput(byte[] data) {
checkState(inputStream == null);
size = data.length;
this.inputStream = () -> new ByteArrayInputStream(data);
return this;
}

Expand Down
Loading