From 7038d033e61c63252004e3a0c727e035c9cce798 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Fri, 10 Jan 2025 10:21:21 +0100 Subject: [PATCH 01/13] Refactor --- .../google/devtools/build/lib/actions/CommandLines.java | 7 ------- .../build/lib/actions/cache/VirtualActionInput.java | 6 +++++- .../java/com/google/devtools/build/lib/exec/BinTools.java | 7 ------- .../devtools/build/lib/actions/util/ActionsTestUtil.java | 7 ------- 4 files changed, 5 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/actions/CommandLines.java b/src/main/java/com/google/devtools/build/lib/actions/CommandLines.java index 91798cf668e7b1..ccbdbd6e9ed533 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/CommandLines.java +++ b/src/main/java/com/google/devtools/build/lib/actions/CommandLines.java @@ -239,13 +239,6 @@ public byte[] atomicallyWriteTo(Path outputPath) throws IOException { return super.atomicallyWriteTo(outputPath); } - @Override - public ByteString getBytes() throws IOException { - ByteString.Output out = ByteString.newOutput(); - writeTo(out); - return out.toByteString(); - } - @Override public String getExecPathString() { return paramFileExecPath.getPathString(); diff --git a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java index bd5c123fa8c5cc..eba7dfc769e25d 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java +++ b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java @@ -118,7 +118,11 @@ protected byte[] writeTo(Path target) throws IOException { * Gets a {@link ByteString} representation of the fake file. Used to avoid copying if the fake * file is internally represented as a {@link ByteString}. */ - public abstract ByteString getBytes() throws IOException; + public ByteString getBytes() throws IOException { + ByteString.Output out = ByteString.newOutput(); + writeTo(out); + return out.toByteString(); + } /** * Returns the metadata for this input if available. Null otherwise. diff --git a/src/main/java/com/google/devtools/build/lib/exec/BinTools.java b/src/main/java/com/google/devtools/build/lib/exec/BinTools.java index 655ef4d02fe29c..331d97b1aa05c3 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/BinTools.java +++ b/src/main/java/com/google/devtools/build/lib/exec/BinTools.java @@ -201,13 +201,6 @@ protected byte[] atomicallyWriteTo(Path outputPath) throws IOException { return digest; } - @Override - public ByteString getBytes() throws IOException { - ByteString.Output out = ByteString.newOutput(); - writeTo(out); - return out.toByteString(); - } - @Override public FileArtifactValue getMetadata() throws IOException { // We intentionally delay hashing until it is necessary. diff --git a/src/test/java/com/google/devtools/build/lib/actions/util/ActionsTestUtil.java b/src/test/java/com/google/devtools/build/lib/actions/util/ActionsTestUtil.java index 2abbaa101be52e..378e643cd6274f 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/util/ActionsTestUtil.java +++ b/src/test/java/com/google/devtools/build/lib/actions/util/ActionsTestUtil.java @@ -318,13 +318,6 @@ public static VirtualActionInput createVirtualActionInput(String relativePath, S /** Creates a {@link VirtualActionInput} with given string as contents and provided path. */ public static VirtualActionInput createVirtualActionInput(PathFragment path, String contents) { return new VirtualActionInput() { - @Override - public ByteString getBytes() throws IOException { - ByteString.Output out = ByteString.newOutput(); - writeTo(out); - return out.toByteString(); - } - @Override public String getExecPathString() { return path.getPathString(); From 944b94bd0ee1a0ab56480242cc874822accf2f1a Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Fri, 10 Jan 2025 12:01:31 +0100 Subject: [PATCH 02/13] Everything but MerkleTree --- .../lib/actions/cache/VirtualActionInput.java | 2 ++ .../com/google/devtools/build/lib/exec/BUILD | 2 ++ .../build/lib/exec/SpawnLogContext.java | 9 +++---- .../google/devtools/build/lib/remote/BUILD | 1 + .../devtools/build/lib/remote/Chunker.java | 15 ----------- .../devtools/build/lib/remote/disk/BUILD | 1 + .../devtools/build/lib/remote/http/BUILD | 1 + .../build/lib/remote/merkletree/BUILD | 1 + .../devtools/build/lib/remote/util/BUILD | 26 ++++++++++++++++++- .../build/lib/remote/util/DigestUtil.java | 17 +++++++++--- 10 files changed, 50 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java index eba7dfc769e25d..6e145f67d20862 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java +++ b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java @@ -117,6 +117,8 @@ protected byte[] writeTo(Path target) throws IOException { /** * Gets a {@link ByteString} representation of the fake file. Used to avoid copying if the fake * file is internally represented as a {@link ByteString}. + * + *

Prefer {@link #writeTo} to this method to avoid materializing the entire file in memory. */ public ByteString getBytes() throws IOException { ByteString.Output out = ByteString.newOutput(); diff --git a/src/main/java/com/google/devtools/build/lib/exec/BUILD b/src/main/java/com/google/devtools/build/lib/exec/BUILD index 44c2e51d933819..9562e58cd4d1df 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/BUILD +++ b/src/main/java/com/google/devtools/build/lib/exec/BUILD @@ -287,6 +287,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/remote/options", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/java/com/google/devtools/build/lib/util/io:io-proto", "//src/main/java/com/google/devtools/build/lib/vfs", @@ -298,6 +299,7 @@ java_library( "//third_party:jsr305", "@com_google_protobuf//:protobuf_java", "@com_google_protobuf//:protobuf_java_util", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", "@zstd-jni", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/exec/SpawnLogContext.java b/src/main/java/com/google/devtools/build/lib/exec/SpawnLogContext.java index 383d10948d8345..094fcca7f0f6f5 100644 --- a/src/main/java/com/google/devtools/build/lib/exec/SpawnLogContext.java +++ b/src/main/java/com/google/devtools/build/lib/exec/SpawnLogContext.java @@ -35,6 +35,7 @@ import com.google.devtools.build.lib.exec.Protos.EnvironmentVariable; import com.google.devtools.build.lib.exec.Protos.Platform; import com.google.devtools.build.lib.remote.options.RemoteOptions; +import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.DigestHashFunction; import com.google.devtools.build.lib.vfs.DigestUtils; import com.google.devtools.build.lib.vfs.FileStatus; @@ -167,11 +168,9 @@ protected Digest computeDigest( if (input != null) { if (input instanceof VirtualActionInput virtualActionInput) { - byte[] blob = virtualActionInput.getBytes().toByteArray(); - return builder - .setHash(digestHashFunction.getHashFunction().hashBytes(blob).toString()) - .setSizeBytes(blob.length) - .build(); + build.bazel.remote.execution.v2.Digest digest = + DigestUtil.compute(virtualActionInput, digestHashFunction.getHashFunction()); + return builder.setHash(digest.getHash()).setSizeBytes(digest.getSizeBytes()).build(); } // Try to obtain a digest from the input metadata. diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index e71133ad0a28f2..e0fc26f51cc5e3 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -108,6 +108,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/merkletree", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/remote/zstd", "//src/main/java/com/google/devtools/build/lib/skyframe:action_execution_value", "//src/main/java/com/google/devtools/build/lib/skyframe:sky_functions", diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index 12c43bb37969d1..edc23b8bce1342 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -22,9 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputHelper; -import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream; import com.google.devtools.build.lib.vfs.Path; import com.google.errorprone.annotations.CanIgnoreReturnValue; @@ -321,18 +318,6 @@ public Builder setInput(long size, Path file) { return this; } - @CanIgnoreReturnValue - public Builder setInput(long size, ActionInput actionInput, Path execRoot) { - checkState(inputStream == null); - this.size = size; - if (actionInput instanceof VirtualActionInput virtualActionInput) { - inputStream = () -> virtualActionInput.getBytes().newInput(); - } else { - inputStream = () -> ActionInputHelper.toInputPath(actionInput, execRoot).getInputStream(); - } - return this; - } - @CanIgnoreReturnValue @VisibleForTesting protected final Builder setInputSupplier(ChunkDataSupplier inputStream) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/disk/BUILD b/src/main/java/com/google/devtools/build/lib/remote/disk/BUILD index 6b5d3d8d05e4fc..962a5388d0ddf2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/disk/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/disk/BUILD @@ -21,6 +21,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/server:idle_task", "//src/main/java/com/google/devtools/build/lib/util:file_system_lock", "//src/main/java/com/google/devtools/build/lib/vfs", diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD index 7ea7e0ef886e96..cb12bfb309a468 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD @@ -26,6 +26,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//third_party:auth", "//third_party:flogger", diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD b/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD index 63610c304eb536..6d67b52c79d326 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/BUILD @@ -22,6 +22,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/remote:scrubber", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/util:string_encoding", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD index 2d5c1f81dd1eef..480bd9c3c3aa59 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD @@ -13,8 +13,15 @@ filegroup( java_library( name = "util", - srcs = glob(["*.java"]), + srcs = glob( + ["*.java"], + exclude = [ + "DigestOutputStream.java", + "DigestUtil.java", + ], + ), deps = [ + ":digest_utils", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:execution_requirements", @@ -42,3 +49,20 @@ java_library( "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", ], ) + +java_library( + name = "digest_utils", + srcs = [ + "DigestOutputStream.java", + "DigestUtil.java", + ], + deps = [ + "//src/main/java/com/google/devtools/build/lib/actions", + "//src/main/java/com/google/devtools/build/lib/remote/common", + "//src/main/java/com/google/devtools/build/lib/vfs", + "//src/main/protobuf:spawn_java_proto", + "//third_party:guava", + "@com_google_protobuf//:protobuf_java", + "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", + ], +) diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java index a3332f8e5e73b2..0a6763e6a815c1 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java +++ b/src/main/java/com/google/devtools/build/lib/remote/util/DigestUtil.java @@ -21,6 +21,7 @@ import build.bazel.remote.execution.v2.DigestFunction; import com.google.common.collect.ImmutableSet; import com.google.common.hash.HashCode; +import com.google.common.hash.HashFunction; import com.google.common.io.BaseEncoding; import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; @@ -30,7 +31,6 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.XattrProvider; import com.google.protobuf.Message; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; @@ -91,10 +91,19 @@ public Digest compute(Path path, FileStatus status) throws IOException { DigestUtils.getDigestWithManualFallback(path, xattrProvider, status), status.getSize()); } + public static Digest compute(VirtualActionInput input, HashFunction hashFunction) + throws IOException { + // Stream the virtual action input as parameter files, which can be very large, are lazily + // computed from the in-memory CommandLine object. This avoids allocating large byte arrays. + try (DigestOutputStream digestOutputStream = + new DigestOutputStream(hashFunction, OutputStream.nullOutputStream())) { + input.writeTo(digestOutputStream); + return digestOutputStream.digest(); + } + } + public Digest compute(VirtualActionInput input) throws IOException { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - input.writeTo(buffer); - return compute(buffer.toByteArray()); + return compute(input, hashFn.getHashFunction()); } /** From 79d3a27d68bfbade509477f093bc2747e7637fdb Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Fri, 10 Jan 2025 13:07:49 +0100 Subject: [PATCH 03/13] Fix usage --- .../lib/actions/cache/VirtualActionInput.java | 11 ++-- .../lib/remote/RemoteExecutionCache.java | 41 ++++++++------- .../lib/remote/common/RemoteCacheClient.java | 1 + .../lib/remote/merkletree/DirectoryTree.java | 22 ++++---- .../merkletree/DirectoryTreeBuilder.java | 5 +- .../lib/remote/merkletree/MerkleTree.java | 52 ++++++------------- .../devtools/build/lib/util/StreamWriter.java | 2 + .../ActionInputDirectoryTreeTest.java | 2 +- .../build/lib/remote/merkletree/BUILD | 1 + .../lib/remote/merkletree/MerkleTreeTest.java | 13 +++-- 10 files changed, 73 insertions(+), 77 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java index 6e145f67d20862..63a582b797fe90 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java +++ b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java @@ -120,9 +120,14 @@ protected byte[] writeTo(Path target) throws IOException { * *

Prefer {@link #writeTo} to this method to avoid materializing the entire file in memory. */ - public ByteString getBytes() throws IOException { + public ByteString getBytes() { ByteString.Output out = ByteString.newOutput(); - writeTo(out); + try { + writeTo(out); + } catch (IOException e) { + // ByteString.Output doesn't throw IOExceptions. + throw new IllegalStateException(e); + } return out.toByteString(); } @@ -180,7 +185,7 @@ public void writeTo(OutputStream out) throws IOException { } @Override - public ByteString getBytes() throws IOException { + public ByteString getBytes() { return ByteString.EMPTY; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index ccd8eba26b3d08..8e949bfe3cad07 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -33,6 +33,7 @@ import com.google.common.collect.Iterables; import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; @@ -42,7 +43,7 @@ import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.disk.DiskCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; -import com.google.devtools.build.lib.remote.merkletree.MerkleTree.PathOrBytes; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; @@ -183,25 +184,29 @@ private ListenableFuture uploadBlob( return remoteCacheClient.uploadBlob(context, digest, node.toByteString()); } - PathOrBytes file = merkleTree.getFileByDigest(digest); + ContentSource file = merkleTree.getFileByDigest(digest); if (file != null) { - if (file.getBytes() != null) { - return remoteCacheClient.uploadBlob(context, digest, file.getBytes()); - } - - var path = checkNotNull(file.getPath()); - try { - if (remotePathChecker.isRemote(context, path)) { - // If we get here, the remote input was determined to exist in the remote or disk cache at - // some point before action execution, but reported to be missing when querying the remote - // for missing action inputs; possibly because it was evicted in the interim. - reporter.post(new LostInputsEvent(digest)); - throw new CacheNotFoundException(digest, path.getPathString()); + return switch (file) { + case ContentSource.VirtualActionInput(VirtualActionInput virtualActionInput) -> + // TODO: Avoid materializing the entire file in memory. + remoteCacheClient.uploadBlob(context, digest, virtualActionInput.getBytes()); + case ContentSource.Path(Path path) -> { + try { + if (remotePathChecker.isRemote(context, path)) { + // If we get here, the remote input was determined to exist in the remote or disk + // cache at + // some point before action execution, but reported to be missing when querying the + // remote + // for missing action inputs; possibly because it was evicted in the interim. + reporter.post(new LostInputsEvent(digest)); + throw new CacheNotFoundException(digest, path.getPathString()); + } + } catch (IOException e) { + yield immediateFailedFuture(e); + } + yield remoteCacheClient.uploadFile(context, digest, path); } - } catch (IOException e) { - return immediateFailedFuture(e); - } - return remoteCacheClient.uploadFile(context, digest, path); + }; } Message message = additionalInputs.get(digest); diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index 4ed37e5a6e58d5..0e23bd1267d5b0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -20,6 +20,7 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import java.io.IOException; diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTree.java index cd3862e35d842a..60ecce8ac50b3f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTree.java @@ -16,10 +16,10 @@ import build.bazel.remote.execution.v2.Digest; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.errorprone.annotations.CanIgnoreReturnValue; -import com.google.protobuf.ByteString; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -73,7 +73,7 @@ public boolean equals(Object o) { static class FileNode extends Node { private final Path path; - private final ByteString data; + private final VirtualActionInput virtualActionInput; private final Digest digest; private final boolean isExecutable; private final boolean toolInput; @@ -96,15 +96,15 @@ static FileNode createExecutable( } static FileNode createExecutable( - String pathSegment, ByteString data, Digest digest, boolean toolInput) { - return new FileNode(pathSegment, data, digest, /* isExecutable= */ true, toolInput); + String pathSegment, VirtualActionInput virtualActionInput, Digest digest, boolean toolInput) { + return new FileNode(pathSegment, virtualActionInput, digest, /* isExecutable= */ true, toolInput); } private FileNode( String pathSegment, Path path, Digest digest, boolean isExecutable, boolean toolInput) { super(pathSegment); this.path = Preconditions.checkNotNull(path, "path"); - this.data = null; + this.virtualActionInput = null; this.digest = Preconditions.checkNotNull(digest, "digest"); this.isExecutable = isExecutable; this.toolInput = toolInput; @@ -112,13 +112,13 @@ private FileNode( private FileNode( String pathSegment, - ByteString data, + VirtualActionInput input, Digest digest, boolean isExecutable, boolean toolInput) { super(pathSegment); this.path = null; - this.data = Preconditions.checkNotNull(data, "data"); + this.virtualActionInput = Preconditions.checkNotNull(input, "data"); this.digest = Preconditions.checkNotNull(digest, "digest"); this.isExecutable = isExecutable; this.toolInput = toolInput; @@ -132,8 +132,8 @@ Path getPath() { return path; } - ByteString getBytes() { - return data; + VirtualActionInput getVirtualActionInput() { + return virtualActionInput; } public boolean isExecutable() { @@ -146,7 +146,7 @@ boolean isToolInput() { @Override public int hashCode() { - return Objects.hash(super.hashCode(), path, data, digest, toolInput, isExecutable); + return Objects.hash(super.hashCode(), path, virtualActionInput, digest, toolInput, isExecutable); } @Override @@ -154,7 +154,7 @@ public boolean equals(Object o) { if (o instanceof FileNode other) { return super.equals(other) && Objects.equals(path, other.path) - && Objects.equals(data, other.data) + && Objects.equals(virtualActionInput, other.virtualActionInput) && Objects.equals(digest, other.digest) && toolInput == other.toolInput && isExecutable == other.isExecutable; diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java index 7b63c14e9a09c7..60fab395147e3f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/DirectoryTreeBuilder.java @@ -157,10 +157,7 @@ private static int buildFromActionInputs( boolean childAdded = currDir.addChild( FileNode.createExecutable( - path.getBaseName(), - virtualActionInput.getBytes(), - d, - toolInputs.contains(path))); + path.getBaseName(), virtualActionInput, d, toolInputs.contains(path))); return childAdded ? 1 : 0; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java index b3e965986c3ad9..4b1d4b5b88296b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java @@ -38,7 +38,6 @@ import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; -import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -55,31 +54,12 @@ public class MerkleTree { private static final String BAZEL_TOOL_INPUT_MARKER = "bazel_tool_input"; - /** A path or contents */ - public static class PathOrBytes { + public sealed interface ContentSource { + record Path(com.google.devtools.build.lib.vfs.Path path) implements ContentSource {} - private final Path path; - private final ByteString bytes; - - public PathOrBytes(Path path) { - this.path = Preconditions.checkNotNull(path, "path"); - this.bytes = null; - } - - public PathOrBytes(ByteString bytes) { - this.bytes = Preconditions.checkNotNull(bytes, "bytes"); - this.path = null; - } - - @Nullable - public Path getPath() { - return path; - } - - @Nullable - public ByteString getBytes() { - return bytes; - } + record VirtualActionInput( + com.google.devtools.build.lib.actions.cache.VirtualActionInput virtualActionInput) + implements ContentSource {} } private interface MerkleTreeDirectoryVisitor { @@ -95,7 +75,7 @@ private interface MerkleTreeDirectoryVisitor { } private Map digestDirectoryMap; - private Map digestFileMap; + private Map digestFileMap; @Nullable private final Directory rootProto; private final Digest rootDigest; private final SortedSet files; @@ -177,13 +157,19 @@ private Map getDigestDirectoryMap() { return this.digestDirectoryMap; } - private Map getDigestFileMap() { + private Map getDigestFileMap() { if (this.digestFileMap == null) { - Map newDigestMap = Maps.newHashMap(); + Map newDigestMap = Maps.newHashMap(); visitTree( (dir) -> { for (DirectoryTree.FileNode file : dir.getFiles()) { - newDigestMap.put(file.getDigest(), toPathOrBytes(file)); + ContentSource contentSource; + if (file.getPath() != null) { + contentSource = new ContentSource.Path(file.getPath()); + } else { + contentSource = new ContentSource.VirtualActionInput(file.getVirtualActionInput()); + } + newDigestMap.put(file.getDigest(), contentSource); } }); this.digestFileMap = newDigestMap; @@ -197,7 +183,7 @@ public Directory getDirectoryByDigest(Digest digest) { } @Nullable - public PathOrBytes getFileByDigest(Digest digest) { + public ContentSource getFileByDigest(Digest digest) { return getDigestFileMap().get(digest); } @@ -420,10 +406,4 @@ private static SymlinkNode buildProto(DirectoryTree.SymlinkNode symlink) { .setTarget(internalToUnicode(symlink.getTarget())) .build(); } - - private static PathOrBytes toPathOrBytes(DirectoryTree.FileNode file) { - return file.getPath() != null - ? new PathOrBytes(file.getPath()) - : new PathOrBytes(file.getBytes()); - } } diff --git a/src/main/java/com/google/devtools/build/lib/util/StreamWriter.java b/src/main/java/com/google/devtools/build/lib/util/StreamWriter.java index a3c5763a830b69..be465e0771f7cf 100644 --- a/src/main/java/com/google/devtools/build/lib/util/StreamWriter.java +++ b/src/main/java/com/google/devtools/build/lib/util/StreamWriter.java @@ -23,6 +23,8 @@ public interface StreamWriter { /** * Writes the fake file to an OutputStream. MUST be deterministic, in that multiple calls to * write the same StreamWriter must write identical bytes. + * + * @throws IOException only if out throws an IOException */ void writeTo(OutputStream out) throws IOException; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/ActionInputDirectoryTreeTest.java b/src/test/java/com/google/devtools/build/lib/remote/merkletree/ActionInputDirectoryTreeTest.java index eb665aca0b67b8..f6a88f43525500 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/ActionInputDirectoryTreeTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/ActionInputDirectoryTreeTest.java @@ -86,7 +86,7 @@ public void virtualActionInputShouldWork() throws Exception { FileNode expectedFooNode = FileNode.createExecutable("foo.cc", foo.getPath(), digestUtil.computeAsUtf8("foo")); FileNode expectedBarNode = - FileNode.createExecutable("bar.cc", bar.getBytes(), digestUtil.computeAsUtf8("bar"), false); + FileNode.createExecutable("bar.cc", bar, digestUtil.computeAsUtf8("bar"), false); assertThat(fileNodesAtDepth(tree, 0)).isEmpty(); assertThat(fileNodesAtDepth(tree, 1)).containsExactly(expectedFooNode, expectedBarNode); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/BUILD b/src/test/java/com/google/devtools/build/lib/remote/merkletree/BUILD index 5724817f1d0d02..efabf16c0780b9 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/BUILD @@ -28,6 +28,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/clock", "//src/main/java/com/google/devtools/build/lib/remote/merkletree", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java b/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java index 6c91c85bde8517..0938735045def5 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java @@ -30,6 +30,7 @@ import com.google.devtools.build.lib.actions.StaticInputMetadataProvider; import com.google.devtools.build.lib.actions.util.ActionsTestUtil; import com.google.devtools.build.lib.clock.JavaClock; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.DigestHashFunction; import com.google.devtools.build.lib.vfs.FileSystem; @@ -138,10 +139,14 @@ public void buildMerkleTree() throws IOException { digestUtil.computeAsUtf8("buzz"), digestUtil.computeAsUtf8("fizzbuzz") }; - assertThat(tree.getFileByDigest(inputDigests[0]).getPath()).isEqualTo(foo.getPath()); - assertThat(tree.getFileByDigest(inputDigests[1]).getPath()).isEqualTo(bar.getPath()); - assertThat(tree.getFileByDigest(inputDigests[2]).getPath()).isEqualTo(buzz.getPath()); - assertThat(tree.getFileByDigest(inputDigests[3]).getPath()).isEqualTo(fizzbuzz.getPath()); + assertThat(tree.getFileByDigest(inputDigests[0])) + .isEqualTo(new ContentSource.Path(foo.getPath())); + assertThat(tree.getFileByDigest(inputDigests[1])) + .isEqualTo(new ContentSource.Path(bar.getPath())); + assertThat(tree.getFileByDigest(inputDigests[2])) + .isEqualTo(new ContentSource.Path(buzz.getPath())); + assertThat(tree.getFileByDigest(inputDigests[3])) + .isEqualTo(new ContentSource.Path(fizzbuzz.getPath())); Digest[] allDigests = Iterables.toArray(tree.getAllDigests(), Digest.class); assertThat(allDigests.length).isEqualTo(dirDigests.length + inputDigests.length); From eb403b2a94a37238a9a11096bc39ef13614fa756 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Fri, 10 Jan 2025 13:18:13 +0100 Subject: [PATCH 04/13] Store MerkleTree nodes unwrapped --- .../lib/actions/cache/VirtualActionInput.java | 3 ++- .../lib/remote/RemoteExecutionCache.java | 7 ++--- .../lib/remote/merkletree/MerkleTree.java | 27 +++++++++++-------- .../lib/remote/merkletree/MerkleTreeTest.java | 8 +++--- 4 files changed, 26 insertions(+), 19 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java index 63a582b797fe90..63688bef235463 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java +++ b/src/main/java/com/google/devtools/build/lib/actions/cache/VirtualActionInput.java @@ -118,7 +118,8 @@ protected byte[] writeTo(Path target) throws IOException { * Gets a {@link ByteString} representation of the fake file. Used to avoid copying if the fake * file is internally represented as a {@link ByteString}. * - *

Prefer {@link #writeTo} to this method to avoid materializing the entire file in memory. + *

Prefer {@link #writeTo} to this method to avoid materializing the entire file in memory. The + * return value should not be retained. */ public ByteString getBytes() { ByteString.Output out = ByteString.newOutput(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index 8e949bfe3cad07..9dfb151afb1aa3 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -187,10 +187,11 @@ private ListenableFuture uploadBlob( ContentSource file = merkleTree.getFileByDigest(digest); if (file != null) { return switch (file) { - case ContentSource.VirtualActionInput(VirtualActionInput virtualActionInput) -> - // TODO: Avoid materializing the entire file in memory. + case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) -> + // TODO: Avoid materializing the entire file in memory. This requires changing the + // upload to be driven by an OutputStream rather than consuming an InputStream. remoteCacheClient.uploadBlob(context, digest, virtualActionInput.getBytes()); - case ContentSource.Path(Path path) -> { + case ContentSource.PathSource(Path path) -> { try { if (remotePathChecker.isRemote(context, path)) { // If we get here, the remote input was determined to exist in the remote or disk diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java index 4b1d4b5b88296b..98fa57c440107b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java @@ -32,6 +32,7 @@ import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ArtifactPathResolver; import com.google.devtools.build.lib.actions.InputMetadataProvider; +import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.Scrubber.SpawnScrubber; @@ -55,10 +56,9 @@ public class MerkleTree { private static final String BAZEL_TOOL_INPUT_MARKER = "bazel_tool_input"; public sealed interface ContentSource { - record Path(com.google.devtools.build.lib.vfs.Path path) implements ContentSource {} + record PathSource(Path path) implements ContentSource {} - record VirtualActionInput( - com.google.devtools.build.lib.actions.cache.VirtualActionInput virtualActionInput) + record VirtualActionInputSource(VirtualActionInput virtualActionInput) implements ContentSource {} } @@ -75,7 +75,8 @@ private interface MerkleTreeDirectoryVisitor { } private Map digestDirectoryMap; - private Map digestFileMap; + // Object is an unwrapped ContentSource to reduce retained memory when caching MerkleTrees. + private Map digestFileMap; @Nullable private final Directory rootProto; private final Digest rootDigest; private final SortedSet files; @@ -157,19 +158,17 @@ private Map getDigestDirectoryMap() { return this.digestDirectoryMap; } - private Map getDigestFileMap() { + private Map getDigestFileMap() { if (this.digestFileMap == null) { - Map newDigestMap = Maps.newHashMap(); + Map newDigestMap = Maps.newHashMap(); visitTree( (dir) -> { for (DirectoryTree.FileNode file : dir.getFiles()) { - ContentSource contentSource; if (file.getPath() != null) { - contentSource = new ContentSource.Path(file.getPath()); + newDigestMap.put(file.getDigest(), file.getPath()); } else { - contentSource = new ContentSource.VirtualActionInput(file.getVirtualActionInput()); + newDigestMap.put(file.getDigest(), file.getVirtualActionInput()); } - newDigestMap.put(file.getDigest(), contentSource); } }); this.digestFileMap = newDigestMap; @@ -184,7 +183,13 @@ public Directory getDirectoryByDigest(Digest digest) { @Nullable public ContentSource getFileByDigest(Digest digest) { - return getDigestFileMap().get(digest); + Object pathOrVirtualActionInput = getDigestFileMap().get(digest); + if (pathOrVirtualActionInput instanceof Path path) { + return new ContentSource.PathSource(path); + } else { + return new ContentSource.VirtualActionInputSource( + (VirtualActionInput) pathOrVirtualActionInput); + } } /** diff --git a/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java b/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java index 0938735045def5..84de07eed3de4b 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/merkletree/MerkleTreeTest.java @@ -140,13 +140,13 @@ public void buildMerkleTree() throws IOException { digestUtil.computeAsUtf8("fizzbuzz") }; assertThat(tree.getFileByDigest(inputDigests[0])) - .isEqualTo(new ContentSource.Path(foo.getPath())); + .isEqualTo(new ContentSource.PathSource(foo.getPath())); assertThat(tree.getFileByDigest(inputDigests[1])) - .isEqualTo(new ContentSource.Path(bar.getPath())); + .isEqualTo(new ContentSource.PathSource(bar.getPath())); assertThat(tree.getFileByDigest(inputDigests[2])) - .isEqualTo(new ContentSource.Path(buzz.getPath())); + .isEqualTo(new ContentSource.PathSource(buzz.getPath())); assertThat(tree.getFileByDigest(inputDigests[3])) - .isEqualTo(new ContentSource.Path(fizzbuzz.getPath())); + .isEqualTo(new ContentSource.PathSource(fizzbuzz.getPath())); Digest[] allDigests = Iterables.toArray(tree.getAllDigests(), Digest.class); assertThat(allDigests.length).isEqualTo(dirDigests.length + inputDigests.length); From d97931e99edfce5187c2dd07c2feb78b9c0da7ed Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Fri, 10 Jan 2025 14:05:01 +0100 Subject: [PATCH 05/13] Switch to Supplier --- .../devtools/build/lib/remote/Chunker.java | 20 +++++--------- .../build/lib/remote/GrpcCacheClient.java | 18 +++---------- .../lib/remote/RemoteExecutionCache.java | 3 ++- .../lib/remote/common/RemoteCacheClient.java | 26 ++++++++++++++++--- .../lib/remote/http/HttpCacheClient.java | 19 +++----------- .../google/devtools/build/lib/remote/BUILD | 3 +++ .../lib/remote/ByteStreamUploaderTest.java | 3 ++- .../build/lib/remote/ChunkerTest.java | 2 +- .../build/lib/remote/CombinedCacheTest.java | 10 ++++--- .../remote/RemoteExecutionServiceTest.java | 4 ++- .../devtools/build/lib/remote/disk/BUILD | 1 + .../build/lib/remote/downloader/BUILD | 1 + .../devtools/build/lib/remote/http/BUILD | 1 + .../devtools/build/lib/remote/logging/BUILD | 1 + .../lib/remote/util/InMemoryCacheClient.java | 21 +++------------ .../google/devtools/build/remote/worker/BUILD | 1 + .../build/remote/worker/ByteStreamServer.java | 4 ++- 17 files changed, 63 insertions(+), 75 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index edc23b8bce1342..5bbbac882a7cb7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream; -import com.google.devtools.build.lib.vfs.Path; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; @@ -294,27 +293,20 @@ public static class Builder { protected ChunkDataSupplier inputStream; @CanIgnoreReturnValue - public Builder setInput(byte[] data) { - checkState(inputStream == null); - size = data.length; - setInputSupplier(() -> new ByteArrayInputStream(data)); - return this; - } - - @CanIgnoreReturnValue - public Builder setInput(long size, InputStream in) { + public Builder setInput(long size, ChunkDataSupplier in) { checkState(inputStream == null); checkNotNull(in); this.size = size; - inputStream = () -> in; + inputStream = in; return this; } @CanIgnoreReturnValue - public Builder setInput(long size, Path file) { + @VisibleForTesting + public Builder setInput(byte[] data) { checkState(inputStream == null); - this.size = size; - inputStream = file::getInputStream; + size = data.length; + setInputSupplier(() -> new ByteArrayInputStream(data)); return this; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index b8f8c111ed453e..362fa23e7c60e0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -59,7 +59,6 @@ import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.remote.util.Utils; import com.google.devtools.build.lib.remote.zstd.ZstdDecompressingOutputStream; -import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.grpc.Channel; import io.grpc.Status; @@ -68,6 +67,7 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; @@ -483,26 +483,14 @@ private void releaseOut() { return future; } - @Override - public ListenableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path path) { - return uploadChunker( - context, - digest, - Chunker.builder() - .setInput(digest.getSizeBytes(), path) - .setCompressed(shouldCompress(digest)) - .build()); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { + RemoteActionExecutionContext context, Digest digest, Supplier data) { return uploadChunker( context, digest, Chunker.builder() - .setInput(data.toByteArray()) + .setInput(digest.getSizeBytes(), data::get) .setCompressed(shouldCompress(digest)) .build()); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index 9dfb151afb1aa3..e246febbedf3b2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -190,7 +190,8 @@ private ListenableFuture uploadBlob( case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) -> // TODO: Avoid materializing the entire file in memory. This requires changing the // upload to be driven by an OutputStream rather than consuming an InputStream. - remoteCacheClient.uploadBlob(context, digest, virtualActionInput.getBytes()); + remoteCacheClient.uploadBlob( + context, digest, () -> virtualActionInput.getBytes().newInput()); case ContentSource.PathSource(Path path) -> { try { if (remotePathChecker.isRemote(context, path)) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index 0e23bd1267d5b0..5b5d4ce0893a7c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -20,12 +20,13 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Set; +import java.util.function.Supplier; /** * An interface for a remote caching protocol. @@ -112,6 +113,18 @@ ListenableFuture uploadActionResult( ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out); + /** + * Uploads a blob to the CAS. + * + * @param context the context for the action. + * @param digest The digest of the blob. + * @param data A supplier for the data to upload. Will be called at most once and as close as as + * possible in time to the actual upload. + * @return A future representing pending completion of the upload. + */ + ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, Supplier data); + /** * Uploads a {@code file} to the CAS. * @@ -120,7 +133,10 @@ ListenableFuture downloadBlob( * @param file The file to upload. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadFile(RemoteActionExecutionContext context, Digest digest, Path file); + default ListenableFuture uploadFile( + RemoteActionExecutionContext context, Digest digest, Path file) { + return uploadBlob(context, digest, () -> new LazyFileInputStream(file)); + } /** * Uploads a BLOB to the CAS. @@ -130,8 +146,10 @@ ListenableFuture downloadBlob( * @param data The BLOB to upload. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data); + default ListenableFuture uploadBlob( + RemoteActionExecutionContext context, Digest digest, ByteString data) { + return uploadBlob(context, digest, data::newInput); + } /** Close resources associated with the remote cache. */ void close(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index e6a661ddbe357a..cb7b872aede4d3 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -37,7 +37,6 @@ import com.google.devtools.build.lib.remote.util.DigestOutputStream; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.Utils; -import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -93,6 +92,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -718,25 +718,12 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path file) { - return retrier.executeAsync( - () -> - uploadAsync( - digest.getHash(), - digest.getSizeBytes(), - new LazyFileInputStream(file), - /* casUpload= */ true)); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { + RemoteActionExecutionContext context, Digest digest, Supplier in) { return retrier.executeAsync( () -> - uploadAsync( - digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true)); + uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true)); } @Override diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index 9e80fb69e0a5a2..cb3ae8cf37ef37 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -121,6 +121,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/merkletree", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/runtime/commands", "//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value", "//src/main/java/com/google/devtools/build/lib/testing/vfs:spied_filesystem", @@ -242,6 +243,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/dynamic", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/build/lib/util:os", "//src/main/java/com/google/devtools/build/lib/vfs", @@ -268,6 +270,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote:store", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 9b0d421c1f7442..9b039d3d90a489 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -693,7 +693,8 @@ public void earlyWriteResponseShouldCompleteUpload() throws Exception { // provide only enough data to write a single chunk InputStream in = new ByteArrayInputStream(blob, 0, CHUNK_SIZE); - Chunker chunker = Chunker.builder().setInput(blob.length, in).setChunkSize(CHUNK_SIZE).build(); + Chunker chunker = + Chunker.builder().setInput(blob.length, () -> in).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); serviceRegistry.addService( diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index 7c693480df1482..1964904acc0c9d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -94,7 +94,7 @@ public void close() throws IOException { super.close(); } }; - Chunker chunker = Chunker.builder().setInput(0, inp).build(); + Chunker chunker = Chunker.builder().setInput(0, () -> inp).build(); assertThat(chunker.hasNext()).isTrue(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java index 251d04a8ec5176..24bf0944c28bf7 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java @@ -73,6 +73,7 @@ import com.google.devtools.common.options.Options; import com.google.protobuf.ByteString; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Deque; @@ -88,6 +89,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -397,7 +399,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -472,7 +474,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -553,7 +555,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); doAnswer( invocationOnMock -> { Path file = invocationOnMock.getArgument(2, Path.class); @@ -652,7 +654,7 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) .uploadFile(any(), any(), any()); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index 100e069b6ab4d1..135092e33a7d51 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -132,6 +132,7 @@ import com.google.testing.junit.testparameterinjector.TestParameter; import com.google.testing.junit.testparameterinjector.TestParameterInjector; import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.Map; import java.util.Random; @@ -140,6 +141,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import javax.annotation.Nullable; import org.junit.Before; import org.junit.Rule; @@ -2162,7 +2164,7 @@ public void uploadInputsIfNotPresent_interrupted_requestCancelled() throws Excep return future; }) .when(cache.remoteCacheClient) - .uploadBlob(any(), any(), any()); + .uploadBlob(any(), any(), (Supplier) any()); ActionInput input = ActionInputHelper.fromPath("inputs/foo"); fakeFileCache.createScratchInput(input, "input-foo"); RemoteExecutionService service = newRemoteExecutionService(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD b/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD index 7f09d8d86a2202..fa5cf2b4008345 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/disk/BUILD @@ -23,6 +23,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote/common:cache_not_found_exception", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs/bazel", "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD b/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD index 2d450d41cf3423..3dee6d7f751131 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD @@ -31,6 +31,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/downloader", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", "//src/test/java/com/google/devtools/build/lib/remote/util", diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD index 16914a57fa8091..91b6c87d52e66e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD @@ -28,6 +28,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/common/options", "//src/test/java/com/google/devtools/build/lib:test_runner", diff --git a/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD b/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD index 3720b4bb0a5777..86596fd00085d2 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/logging/BUILD @@ -20,6 +20,7 @@ java_test( deps = [ "//src/main/java/com/google/devtools/build/lib/remote/logging", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/util/io", "//src/main/protobuf:remote_execution_log_java_proto", "//src/test/java/com/google/devtools/build/lib:test_runner", diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java index f9de62f38d6bb8..182e3eb1780d87 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java +++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java @@ -20,7 +20,6 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import build.bazel.remote.execution.v2.SymlinkAbsolutePathStrategy; import com.google.common.collect.ImmutableSet; -import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -28,7 +27,6 @@ import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import java.io.IOException; import java.io.InputStream; @@ -40,6 +38,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.stream.Collectors; /** A {@link RemoteCacheClient} that stores its contents in memory. */ @@ -142,23 +141,11 @@ public ListenableFuture uploadActionResult( ac.put(actionKey, actionResult); return Futures.immediateFuture(null); } - - @Override - public ListenableFuture uploadFile( - RemoteActionExecutionContext context, Digest digest, Path file) { - try (InputStream in = file.getInputStream()) { - cas.put(digest, ByteStreams.toByteArray(in)); - } catch (IOException e) { - return Futures.immediateFailedFuture(e); - } - return Futures.immediateFuture(null); - } - @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { - try (InputStream in = data.newInput()) { - cas.put(digest, data.toByteArray()); + RemoteActionExecutionContext context, Digest digest, Supplier data) { + try { + cas.put(digest, data.get().readAllBytes()); } catch (IOException e) { return Futures.immediateFailedFuture(e); } diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD index e40a00034cd743..606ff0b7b89c38 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/BUILD @@ -40,6 +40,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", + "//src/main/java/com/google/devtools/build/lib/remote/util:digest_utils", "//src/main/java/com/google/devtools/build/lib/sandbox:linux_sandbox_command_line_builder", "//src/main/java/com/google/devtools/build/lib/shell", "//src/main/java/com/google/devtools/build/lib/util:os", diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java index 0fecbb33ec7091..15080fa9280d3f 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java @@ -34,6 +34,7 @@ import io.grpc.Status; import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.OutputStream; import java.util.UUID; @@ -83,8 +84,9 @@ public void read(ReadRequest request, StreamObserver responseObser try { // This still relies on the blob size to be small enough to fit in memory. // TODO(olaola): refactor to fix this if the need arises. + byte[] bytes = getFromFuture(cache.downloadBlob(context, digest)); Chunker c = - Chunker.builder().setInput(getFromFuture(cache.downloadBlob(context, digest))).build(); + Chunker.builder().setInput(bytes.length, () -> new ByteArrayInputStream(bytes)).build(); while (c.hasNext()) { responseObserver.onNext( ReadResponse.newBuilder().setData(c.next().getData()).build()); From 66da6d112a0504ac464bca1e60af8d052328ffac Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Fri, 10 Jan 2025 20:31:38 +0100 Subject: [PATCH 06/13] Cache data supplier --- .../devtools/build/lib/remote/Chunker.java | 20 +++++------ .../build/lib/remote/GrpcCacheClient.java | 5 ++- .../lib/remote/RemoteExecutionCache.java | 34 +++++++++++++++++-- .../devtools/build/lib/remote/common/BUILD | 1 + .../lib/remote/common/RemoteCacheClient.java | 15 +++++--- .../lib/remote/http/HttpCacheClient.java | 4 +-- .../build/lib/remote/ChunkerTest.java | 4 +-- 7 files changed, 58 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index 5bbbac882a7cb7..165de59fd0b178 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; @@ -96,12 +97,7 @@ public int hashCode() { } } - /** A supplier that provide data as {@link InputStream}. */ - public interface ChunkDataSupplier { - InputStream get() throws IOException; - } - - private final ChunkDataSupplier dataSupplier; + private final RemoteCacheClient.CloseableBlobSupplier dataSupplier; private final long uncompressedSize; private final int chunkSize; private final Chunk emptyChunk; @@ -117,7 +113,10 @@ public interface ChunkDataSupplier { private boolean initialized; Chunker( - ChunkDataSupplier dataSupplier, long uncompressedSize, int chunkSize, boolean compressed) { + RemoteCacheClient.CloseableBlobSupplier dataSupplier, + long uncompressedSize, + int chunkSize, + boolean compressed) { this.dataSupplier = checkNotNull(dataSupplier); this.uncompressedSize = uncompressedSize; this.chunkSize = chunkSize; @@ -142,6 +141,7 @@ public void reset() throws IOException { close(); offset = 0; initialized = false; + dataSupplier.close(); } /** @@ -290,10 +290,10 @@ public static class Builder { private int chunkSize = getDefaultChunkSize(); protected long size; private boolean compressed; - protected ChunkDataSupplier inputStream; + protected RemoteCacheClient.CloseableBlobSupplier inputStream; @CanIgnoreReturnValue - public Builder setInput(long size, ChunkDataSupplier in) { + public Builder setInput(long size, RemoteCacheClient.CloseableBlobSupplier in) { checkState(inputStream == null); checkNotNull(in); this.size = size; @@ -312,7 +312,7 @@ public Builder setInput(byte[] data) { @CanIgnoreReturnValue @VisibleForTesting - protected final Builder setInputSupplier(ChunkDataSupplier inputStream) { + protected final Builder setInputSupplier(RemoteCacheClient.CloseableBlobSupplier inputStream) { this.inputStream = inputStream; return this; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 362fa23e7c60e0..5fadba9420b041 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -67,7 +67,6 @@ import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; @@ -485,12 +484,12 @@ private void releaseOut() { @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, Supplier data) { + RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data) { return uploadChunker( context, digest, Chunker.builder() - .setInput(digest.getSizeBytes(), data::get) + .setInput(digest.getSizeBytes(), data) .setCompressed(shouldCompress(digest)) .build()); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index e246febbedf3b2..caef43be456c16 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -48,6 +48,7 @@ import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult; import com.google.devtools.build.lib.vfs.Path; +import com.google.protobuf.ByteString; import com.google.protobuf.Message; import io.reactivex.rxjava3.annotations.NonNull; import io.reactivex.rxjava3.core.Completable; @@ -60,6 +61,7 @@ import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.subjects.AsyncSubject; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -173,6 +175,34 @@ public void ensureInputsPresent( } } + private static final class VirtualActionInputDataSupplier + implements RemoteCacheClient.CloseableBlobSupplier { + private VirtualActionInput virtualActionInput; + private volatile ByteString data; + + VirtualActionInputDataSupplier(VirtualActionInput virtualActionInput) { + this.virtualActionInput = virtualActionInput; + } + + @Override + public InputStream get() { + if (data == null) { + synchronized (this) { + if (data == null) { + data = virtualActionInput.getBytes(); + } + } + } + return data.newInput(); + } + + @Override + public void close() { + virtualActionInput = null; + data = null; + } + } + private ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, @@ -188,10 +218,8 @@ private ListenableFuture uploadBlob( if (file != null) { return switch (file) { case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) -> - // TODO: Avoid materializing the entire file in memory. This requires changing the - // upload to be driven by an OutputStream rather than consuming an InputStream. remoteCacheClient.uploadBlob( - context, digest, () -> virtualActionInput.getBytes().newInput()); + context, digest, new VirtualActionInputDataSupplier(virtualActionInput)); case ContentSource.PathSource(Path path) -> { try { if (remotePathChecker.isRemote(context, path)) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD index f7f60e8e60428f..c04252a26ca373 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD @@ -56,6 +56,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/exec:spawn_input_expander", "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", + "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//third_party:guava", diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index 5b5d4ce0893a7c..416c01c0f6dcbe 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -20,13 +20,13 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Set; -import java.util.function.Supplier; /** * An interface for a remote caching protocol. @@ -113,17 +113,24 @@ ListenableFuture uploadActionResult( ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out); + @FunctionalInterface + interface CloseableBlobSupplier extends SilentCloseable { + InputStream get(); + + @Override + default void close() {} + } + /** * Uploads a blob to the CAS. * * @param context the context for the action. * @param digest The digest of the blob. - * @param data A supplier for the data to upload. Will be called at most once and as close as as - * possible in time to the actual upload. + * @param data A supplier for the data to upload. May be called multiple times. * @return A future representing pending completion of the upload. */ ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, Supplier data); + RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data); /** * Uploads a {@code file} to the CAS. diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index cb7b872aede4d3..4c256303f203f8 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -31,7 +31,6 @@ import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; -import com.google.devtools.build.lib.remote.common.LazyFileInputStream; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.util.DigestOutputStream; @@ -92,7 +91,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import java.util.function.Supplier; import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; @@ -720,7 +718,7 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, Supplier in) { + RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier in) { return retrier.executeAsync( () -> uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true)); diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index 1964904acc0c9d..758717dfba6dd4 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -18,7 +18,7 @@ import com.github.luben.zstd.Zstd; import com.google.devtools.build.lib.remote.Chunker.Chunk; -import com.google.devtools.build.lib.remote.Chunker.ChunkDataSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -136,7 +136,7 @@ public void resourcesShouldBeReleased() throws IOException { byte[] data = new byte[] {1, 2}; final AtomicReference in = new AtomicReference<>(); - ChunkDataSupplier supplier = + RemoteCacheClient.CloseableBlobSupplier supplier = () -> { in.set(Mockito.spy(new ByteArrayInputStream(data))); return in.get(); From 7b181fbcbe262d72b162418afd21ab7f64f6ea4a Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Sat, 11 Jan 2025 21:43:11 +0100 Subject: [PATCH 07/13] Fix --- .../build/lib/remote/merkletree/MerkleTree.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java index 98fa57c440107b..707c6bb8e5acf6 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java @@ -36,6 +36,8 @@ import com.google.devtools.build.lib.profiler.Profiler; import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.Scrubber.SpawnScrubber; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource.PathSource; +import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource.VirtualActionInputSource; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; @@ -183,13 +185,14 @@ public Directory getDirectoryByDigest(Digest digest) { @Nullable public ContentSource getFileByDigest(Digest digest) { - Object pathOrVirtualActionInput = getDigestFileMap().get(digest); - if (pathOrVirtualActionInput instanceof Path path) { - return new ContentSource.PathSource(path); - } else { - return new ContentSource.VirtualActionInputSource( - (VirtualActionInput) pathOrVirtualActionInput); - } + return switch (getDigestFileMap().get(digest)) { + case Path path -> new PathSource(path); + case VirtualActionInput virtualActionInput -> + new VirtualActionInputSource(virtualActionInput); + case null -> null; + default -> + throw new IllegalStateException("Unexpected value: " + getDigestFileMap().get(digest)); + }; } /** From a2388e3b7402856a6b0fa2d4e7f5cd16308ab022 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Sat, 11 Jan 2025 21:46:04 +0100 Subject: [PATCH 08/13] Cleanup --- .../devtools/build/lib/remote/Chunker.java | 19 ++++++------------- .../lib/remote/ByteStreamUploaderTest.java | 6 ++++-- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index 165de59fd0b178..d9585f6b7696ec 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -22,7 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; @@ -97,7 +97,7 @@ public int hashCode() { } } - private final RemoteCacheClient.CloseableBlobSupplier dataSupplier; + private final CloseableBlobSupplier dataSupplier; private final long uncompressedSize; private final int chunkSize; private final Chunk emptyChunk; @@ -113,7 +113,7 @@ public int hashCode() { private boolean initialized; Chunker( - RemoteCacheClient.CloseableBlobSupplier dataSupplier, + CloseableBlobSupplier dataSupplier, long uncompressedSize, int chunkSize, boolean compressed) { @@ -290,10 +290,10 @@ public static class Builder { private int chunkSize = getDefaultChunkSize(); protected long size; private boolean compressed; - protected RemoteCacheClient.CloseableBlobSupplier inputStream; + protected CloseableBlobSupplier inputStream; @CanIgnoreReturnValue - public Builder setInput(long size, RemoteCacheClient.CloseableBlobSupplier in) { + public Builder setInput(long size, CloseableBlobSupplier in) { checkState(inputStream == null); checkNotNull(in); this.size = size; @@ -306,14 +306,7 @@ public Builder setInput(long size, RemoteCacheClient.CloseableBlobSupplier in) { public Builder setInput(byte[] data) { checkState(inputStream == null); size = data.length; - setInputSupplier(() -> new ByteArrayInputStream(data)); - return this; - } - - @CanIgnoreReturnValue - @VisibleForTesting - protected final Builder setInputSupplier(RemoteCacheClient.CloseableBlobSupplier inputStream) { - this.inputStream = inputStream; + this.inputStream = () -> new ByteArrayInputStream(data); return this; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 9b039d3d90a489..d0f95accb257ba 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -42,6 +42,7 @@ import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -1764,7 +1765,7 @@ public void queryWriteStatus( /* Custom Chunker used to track number of open files */ private static class TestChunker extends Chunker { - TestChunker(ChunkDataSupplier dataSupplier, long size, int chunkSize, boolean compressed) { + TestChunker(CloseableBlobSupplier dataSupplier, long size, int chunkSize, boolean compressed) { super(dataSupplier, size, chunkSize, compressed); } @@ -1783,7 +1784,8 @@ private static class TestChunkerBuilder extends Chunker.Builder { public Chunker.Builder setInput(byte[] existingData) { checkState(this.inputStream == null); this.size = existingData.length; - return setInputSupplier( + return setInput( + existingData.length, () -> new TestByteArrayInputStream(existingData, customFileTracker)); } } From f90be10bfa885e7b3b9aaa666954e1b06d7b9455 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Sat, 11 Jan 2025 23:12:41 +0100 Subject: [PATCH 09/13] Add better error message --- .../devtools/build/lib/remote/RemoteExecutionCache.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index caef43be456c16..d4dbe7a14b7332 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -27,6 +27,7 @@ import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.Directory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -41,6 +42,7 @@ import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; import com.google.devtools.build.lib.remote.disk.DiskCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource; @@ -175,13 +177,12 @@ public void ensureInputsPresent( } } - private static final class VirtualActionInputDataSupplier - implements RemoteCacheClient.CloseableBlobSupplier { + private static final class VirtualActionInputDataSupplier implements CloseableBlobSupplier { private VirtualActionInput virtualActionInput; private volatile ByteString data; VirtualActionInputDataSupplier(VirtualActionInput virtualActionInput) { - this.virtualActionInput = virtualActionInput; + this.virtualActionInput = Preconditions.checkNotNull(virtualActionInput); } @Override @@ -189,7 +190,7 @@ public InputStream get() { if (data == null) { synchronized (this) { if (data == null) { - data = virtualActionInput.getBytes(); + data = Preconditions.checkNotNull(virtualActionInput, "used after close()").getBytes(); } } } From d9db761b3c273f1ea949f078c0cf7c76b7892efe Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Sat, 11 Jan 2025 23:33:23 +0100 Subject: [PATCH 10/13] Close correctly --- .../com/google/devtools/build/lib/remote/Chunker.java | 6 +++++- .../devtools/build/lib/remote/GrpcCacheClient.java | 1 + .../build/lib/remote/http/HttpCacheClient.java | 10 +++++++--- .../devtools/build/lib/remote/CombinedCacheTest.java | 9 +++++---- .../build/lib/remote/RemoteExecutionServiceTest.java | 5 ++--- .../build/lib/remote/util/InMemoryCacheClient.java | 6 ++---- 6 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index d9585f6b7696ec..01479bbffa6425 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -27,6 +27,7 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.PushbackInputStream; @@ -141,7 +142,6 @@ public void reset() throws IOException { close(); offset = 0; initialized = false; - dataSupplier.close(); } /** @@ -184,6 +184,10 @@ private void close() throws IOException { chunkCache = null; } + public void closeQuietly() { + dataSupplier.close(); + } + /** Attempts reading at most a full chunk and stores it in the chunkCache buffer */ private int read() throws IOException { int count = 0; diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 5fadba9420b041..308b2f3ec5b5dc 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -501,6 +501,7 @@ ListenableFuture uploadChunker( () -> { try { chunker.reset(); + chunker.closeQuietly(); } catch (IOException e) { logger.atWarning().withCause(e).log( "failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes()); diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 4c256303f203f8..644a38fc91ea82 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -639,7 +639,7 @@ private ListenableFuture uploadAsync( public void close() { // Ensure that the InputStream can't be closed somewhere in the Netty // pipeline, so that we can support retries. The InputStream is closed in - // the finally block below. + // the listener block below. } }; UploadCommand upload = new UploadCommand(uri, casUpload, key, wrappedIn, length); @@ -720,8 +720,12 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier in) { return retrier.executeAsync( - () -> - uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true)); + () -> { + var result = + uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true); + result.addListener(in::close, MoreExecutors.directExecutor()); + return result; + }); } @Override diff --git a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java index 24bf0944c28bf7..4157bb8bee9023 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java @@ -55,6 +55,7 @@ import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -399,7 +400,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (Supplier) any()); + .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -474,7 +475,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (Supplier) any()); + .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -555,7 +556,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (Supplier) any()); + .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); doAnswer( invocationOnMock -> { Path file = invocationOnMock.getArgument(2, Path.class); @@ -654,7 +655,7 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) - .uploadBlob(any(), any(), (Supplier) any()); + .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) .uploadFile(any(), any(), any()); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index 135092e33a7d51..6cc79b95098738 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -101,6 +101,7 @@ import com.google.devtools.build.lib.remote.RemoteScrubbing.Config; import com.google.devtools.build.lib.remote.common.BulkTransferException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.common.RemotePathResolver; import com.google.devtools.build.lib.remote.common.RemotePathResolver.DefaultRemotePathResolver; @@ -132,7 +133,6 @@ import com.google.testing.junit.testparameterinjector.TestParameter; import com.google.testing.junit.testparameterinjector.TestParameterInjector; import java.io.IOException; -import java.io.InputStream; import java.util.Collection; import java.util.Map; import java.util.Random; @@ -141,7 +141,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; import javax.annotation.Nullable; import org.junit.Before; import org.junit.Rule; @@ -2164,7 +2163,7 @@ public void uploadInputsIfNotPresent_interrupted_requestCancelled() throws Excep return future; }) .when(cache.remoteCacheClient) - .uploadBlob(any(), any(), (Supplier) any()); + .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); ActionInput input = ActionInputHelper.fromPath("inputs/foo"); fakeFileCache.createScratchInput(input, "input-foo"); RemoteExecutionService service = newRemoteExecutionService(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java index 182e3eb1780d87..01a3870d69cfed 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java +++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java @@ -27,9 +27,7 @@ import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -import com.google.protobuf.ByteString; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.AbstractMap.SimpleEntry; import java.util.Map; @@ -38,7 +36,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; import java.util.stream.Collectors; /** A {@link RemoteCacheClient} that stores its contents in memory. */ @@ -141,9 +138,10 @@ public ListenableFuture uploadActionResult( ac.put(actionKey, actionResult); return Futures.immediateFuture(null); } + @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, Supplier data) { + RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data) { try { cas.put(digest, data.get().readAllBytes()); } catch (IOException e) { From e7079acad31d61e703a003c7eaa5069172e29441 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Mon, 13 Jan 2025 11:29:07 +0100 Subject: [PATCH 11/13] Move methods into tests --- .../build/lib/remote/ByteStreamUploader.java | 53 ---------- .../lib/remote/ByteStreamUploaderTest.java | 98 ++++++++++++++----- 2 files changed, 74 insertions(+), 77 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index aea5383cb2a6e3..c7602543032709 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -16,8 +16,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction; -import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; -import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; @@ -50,9 +48,6 @@ import io.grpc.stub.ClientResponseObserver; import io.netty.util.ReferenceCounted; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.Semaphore; import javax.annotation.Nullable; @@ -104,54 +99,6 @@ final class ByteStreamUploader { this.digestFunction = digestFunction; } - @VisibleForTesting - ReferenceCountedChannel getChannel() { - return channel; - } - - @VisibleForTesting - RemoteRetrier getRetrier() { - return retrier; - } - - /** - * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. - * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. - * - *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is - * transparent to the user of this API. - * - * @param digest the digest of the data to upload. - * @param chunker the data to upload. - * @throws IOException when reading of the {@link Chunker}s input source fails - */ - public void uploadBlob(RemoteActionExecutionContext context, Digest digest, Chunker chunker) - throws IOException, InterruptedException { - getFromFuture(uploadBlobAsync(context, digest, chunker)); - } - - /** - * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks - * until the upload of all BLOBs is complete, or throws an {@link - * com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors. - * - *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is - * transparent to the user of this API. - * - * @param chunkers the data to upload. - * @throws IOException when reading of the {@link Chunker}s input source or uploading fails - */ - public void uploadBlobs(RemoteActionExecutionContext context, Map chunkers) - throws IOException, InterruptedException { - List> uploads = new ArrayList<>(); - - for (Map.Entry chunkerEntry : chunkers.entrySet()) { - uploads.add(uploadBlobAsync(context, chunkerEntry.getKey(), chunkerEntry.getValue())); - } - - waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true); - } - /** * Uploads a BLOB asynchronously to the remote {@code ByteStream} service. The call returns * immediately and one can listen to the returned future for the success/failure of the upload. diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index d0f95accb257ba..0986a00ce2ec19 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -15,6 +15,8 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.truth.Truth.assertThat; +import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; +import static com.google.devtools.build.lib.remote.util.Utils.waitForBulkTransfer; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertThrows; import static org.junit.Assert.fail; @@ -180,7 +182,7 @@ public void singleBlobUploadShouldWork() throws Exception { serviceRegistry.addService(TestUtils.newNoErrorByteStreamService(blob)); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -243,7 +245,7 @@ public void onCompleted() {} } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -358,7 +360,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test triggers one retry. Mockito.verify(mockBackoff, Mockito.times(1)) @@ -474,7 +476,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) .isTrue(); @@ -541,7 +543,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -598,7 +600,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. assertThat(numWriteCalls.get()).isEqualTo(1); @@ -669,7 +671,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should have triggered a single retry, because it made // no progress. @@ -708,7 +710,7 @@ public StreamObserver write(StreamObserver streamOb } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -759,7 +761,7 @@ public void onCompleted() { }); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { // expected @@ -798,7 +800,7 @@ public StreamObserver write(StreamObserver streamOb } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -831,7 +833,7 @@ public void multipleBlobsUploadShouldWork() throws Exception { serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); } @Test @@ -859,7 +861,8 @@ public void tooManyFilesIOException_adviseMaximumOpenFilesFlag() throws Exceptio + " --bep_maximum_open_remote_upload_files flag to a number lower than your system" + " default (run 'ulimit -a' for *nix-based operating systems). Original error message:" + " Too many open files"; - assertThat(assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker))) + assertThat( + assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker))) .hasMessageThat() .isEqualTo(newMessage); } @@ -901,7 +904,7 @@ public void availablePermitsOpenFileSemaphore_fewerPermitsThanUploads_endWithAll serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); assertThat(uploader.getOpenedFilePermits().availablePermits()).isEqualTo(maximumOpenFiles); } @@ -937,7 +940,7 @@ public void noMaximumOpenFilesFlags_nullSemaphore() throws Exception { serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash)); - uploader.uploadBlobs(context, chunkers); + uploadBlobs(uploader, context, chunkers); assertThat(uploader.getOpenedFilePermits()).isNull(); } @@ -1134,7 +1137,7 @@ public ServerCall.Listener interceptCall( } })); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1165,7 +1168,7 @@ public StreamObserver write(StreamObserver response }); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(RemoteRetrierUtils.causedByStatus(e, Code.INTERNAL)).isTrue(); @@ -1207,7 +1210,7 @@ public StreamObserver write(StreamObserver response Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class); @@ -1255,7 +1258,7 @@ public void onCompleted() { Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1299,7 +1302,7 @@ public void onCompleted() { Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); } @Test @@ -1334,7 +1337,7 @@ public StreamObserver write(StreamObserver response Digest digest = DIGEST_UTIL.compute(blob); try { - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); fail("Should have thrown an exception."); } catch (IOException e) { assertThat(numCalls.get()).isEqualTo(1); @@ -1389,7 +1392,7 @@ public StreamObserver write(StreamObserver streamOb } }); - assertThrows(IOException.class, () -> uploader.uploadBlob(context, digest, chunker)); + assertThrows(IOException.class, () -> uploadBlob(uploader, context, digest, chunker)); assertThat(refreshTimes.get()).isEqualTo(1); assertThat(numUploads.get()).isEqualTo(2); @@ -1474,7 +1477,7 @@ public void onCompleted() { } }); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); assertThat(refreshTimes.get()).isEqualTo(1); assertThat(numUploads.get()).isEqualTo(2); @@ -1540,7 +1543,7 @@ public void queryWriteStatus( Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); assertThat(numUploads.get()).isEqualTo(1); } @@ -1630,7 +1633,7 @@ public void onCompleted() { Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build(); Digest digest = DIGEST_UTIL.compute(blob); - uploader.uploadBlob(context, digest, chunker); + uploadBlob(uploader, context, digest, chunker); // This test should not have triggered any retries. Mockito.verifyNoInteractions(mockBackoff); @@ -1638,6 +1641,53 @@ public void onCompleted() { assertThat(numUploads.get()).isEqualTo(1); } + /** + * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code ByteStream} service. + * The call blocks until the upload is complete, or throws an {@link Exception} in case of error. + * + *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. + * + * @param digest the digest of the data to upload. + * @param chunker the data to upload. + * @throws IOException when reading of the {@link Chunker}s input source fails + */ + private static void uploadBlob( + ByteStreamUploader byteStreamUploader, + RemoteActionExecutionContext context, + Digest digest, + Chunker chunker) + throws IOException, InterruptedException { + getFromFuture(byteStreamUploader.uploadBlobAsync(context, digest, chunker)); + } + + /** + * Uploads a list of BLOBs concurrently to the remote {@code ByteStream} service. The call blocks + * until the upload of all BLOBs is complete, or throws an {@link + * com.google.devtools.build.lib.remote.common.BulkTransferException} if there are errors. + * + *

Uploads are retried according to the specified {@link RemoteRetrier}. Retrying is + * transparent to the user of this API. + * + * @param chunkers the data to upload. + * @throws IOException when reading of the {@link Chunker}s input source or uploading fails + */ + private static void uploadBlobs( + ByteStreamUploader byteStreamUploader, + RemoteActionExecutionContext context, + Map chunkers) + throws IOException, InterruptedException { + List> uploads = new ArrayList<>(); + + for (Map.Entry chunkerEntry : chunkers.entrySet()) { + uploads.add( + byteStreamUploader.uploadBlobAsync( + context, chunkerEntry.getKey(), chunkerEntry.getValue())); + } + + waitForBulkTransfer(uploads, /* cancelRemainingOnInterrupt= */ true); + } + private static class NoopStreamObserver implements StreamObserver { @Override public void onNext(WriteRequest writeRequest) {} From ed0a1ee5469adbc927dea6c47a5baef2581c71bf Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Mon, 13 Jan 2025 11:29:24 +0100 Subject: [PATCH 12/13] Make Chunker closeable --- .../build/lib/remote/ByteStreamUploader.java | 2 +- .../devtools/build/lib/remote/Chunker.java | 37 +++++++++---------- .../build/lib/remote/GrpcCacheClient.java | 9 ++--- .../lib/remote/RemoteExecutionCache.java | 10 +++-- .../devtools/build/lib/remote/common/BUILD | 1 - .../lib/remote/common/RemoteCacheClient.java | 23 ++++++++---- .../lib/remote/http/HttpCacheClient.java | 6 +-- .../lib/remote/merkletree/MerkleTree.java | 1 + .../lib/remote/ByteStreamUploaderTest.java | 4 +- .../build/lib/remote/ChunkerTest.java | 2 +- .../build/lib/remote/CombinedCacheTest.java | 12 +++--- .../remote/RemoteExecutionServiceTest.java | 4 +- .../lib/remote/util/InMemoryCacheClient.java | 6 +-- .../build/remote/worker/ByteStreamServer.java | 10 ++--- 14 files changed, 65 insertions(+), 62 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index c7602543032709..d2b9a92aeea49b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -110,7 +110,7 @@ final class ByteStreamUploader { * performed. This is transparent to the user of this API. * * @param digest the {@link Digest} of the data to upload. - * @param chunker the data to upload. + * @param chunker the data to upload. Callers are responsible for closing the {@link Chunker}. */ public ListenableFuture uploadBlobAsync( RemoteActionExecutionContext context, Digest digest, Chunker chunker) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index 01479bbffa6425..e1da153fe243e4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -22,12 +22,11 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.io.ByteStreams; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream; import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.PushbackInputStream; @@ -44,7 +43,7 @@ * *

This class should not be extended - it's only non-final for testing. */ -public class Chunker { +public class Chunker implements AutoCloseable { private static int defaultChunkSize = 1024 * 16; @@ -98,7 +97,7 @@ public int hashCode() { } } - private final CloseableBlobSupplier dataSupplier; + private final Blob blob; private final long uncompressedSize; private final int chunkSize; private final Chunk emptyChunk; @@ -113,12 +112,8 @@ public int hashCode() { // lazily on the first call to next(), as opposed to opening it in the constructor or on reset(). private boolean initialized; - Chunker( - CloseableBlobSupplier dataSupplier, - long uncompressedSize, - int chunkSize, - boolean compressed) { - this.dataSupplier = checkNotNull(dataSupplier); + Chunker(Blob blob, long uncompressedSize, int chunkSize, boolean compressed) { + this.blob = checkNotNull(blob); this.uncompressedSize = uncompressedSize; this.chunkSize = chunkSize; this.emptyChunk = new Chunk(ByteString.EMPTY, 0); @@ -139,7 +134,7 @@ public long getUncompressedSize() { *

Closes any open resources (file handles, ...). */ public void reset() throws IOException { - close(); + closeInput(); offset = 0; initialized = false; } @@ -164,7 +159,7 @@ public void seek(long toOffset) throws IOException { initialize(toOffset); } if (uncompressedSize > 0 && data.finished()) { - close(); + closeInput(); } } @@ -176,7 +171,7 @@ public boolean hasNext() { } /** Closes the input stream and reset chunk cache */ - private void close() throws IOException { + private void closeInput() throws IOException { if (data != null) { data.close(); data = null; @@ -184,8 +179,10 @@ private void close() throws IOException { chunkCache = null; } - public void closeQuietly() { - dataSupplier.close(); + @Override + public void close() throws IOException { + reset(); + blob.close(); } /** Attempts reading at most a full chunk and stores it in the chunkCache buffer */ @@ -217,7 +214,7 @@ public Chunk next() throws IOException { maybeInitialize(); if (uncompressedSize == 0) { - close(); + closeInput(); return emptyChunk; } @@ -249,7 +246,7 @@ public Chunk next() throws IOException { // or the guard in getActualSize won't work. offset += bytesRead; if (data.finished()) { - close(); + closeInput(); } return new Chunk(blob, offsetBefore); @@ -268,7 +265,7 @@ private void initialize(long srcPos) throws IOException { checkState(offset == 0); checkState(chunkCache == null); try { - var src = dataSupplier.get(); + var src = blob.get(); ByteStreams.skipFully(src, srcPos); data = compressed @@ -294,10 +291,10 @@ public static class Builder { private int chunkSize = getDefaultChunkSize(); protected long size; private boolean compressed; - protected CloseableBlobSupplier inputStream; + protected Blob inputStream; @CanIgnoreReturnValue - public Builder setInput(long size, CloseableBlobSupplier in) { + public Builder setInput(long size, Blob in) { checkState(inputStream == null); checkNotNull(in); this.size = size; diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java index 308b2f3ec5b5dc..51eea5526b1824 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java @@ -484,12 +484,12 @@ private void releaseOut() { @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data) { + RemoteActionExecutionContext context, Digest digest, Blob blob) { return uploadChunker( context, digest, Chunker.builder() - .setInput(digest.getSizeBytes(), data) + .setInput(digest.getSizeBytes(), blob) .setCompressed(shouldCompress(digest)) .build()); } @@ -500,11 +500,10 @@ ListenableFuture uploadChunker( f.addListener( () -> { try { - chunker.reset(); - chunker.closeQuietly(); + chunker.close(); } catch (IOException e) { logger.atWarning().withCause(e).log( - "failed to reset chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes()); + "failed to close chunker uploading %s/%d", digest.getHash(), digest.getSizeBytes()); } }, MoreExecutors.directExecutor()); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index d4dbe7a14b7332..e0bf3a2f5c5b3c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -42,7 +42,7 @@ import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.disk.DiskCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.merkletree.MerkleTree.ContentSource; @@ -177,11 +177,13 @@ public void ensureInputsPresent( } } - private static final class VirtualActionInputDataSupplier implements CloseableBlobSupplier { + private static final class VirtualActionInputBlob implements Blob { private VirtualActionInput virtualActionInput; + // Can be large compared to the retained size of the VirtualActionInput and thus shouldn't be + // kept in memory for an extended period of time. private volatile ByteString data; - VirtualActionInputDataSupplier(VirtualActionInput virtualActionInput) { + VirtualActionInputBlob(VirtualActionInput virtualActionInput) { this.virtualActionInput = Preconditions.checkNotNull(virtualActionInput); } @@ -220,7 +222,7 @@ private ListenableFuture uploadBlob( return switch (file) { case ContentSource.VirtualActionInputSource(VirtualActionInput virtualActionInput) -> remoteCacheClient.uploadBlob( - context, digest, new VirtualActionInputDataSupplier(virtualActionInput)); + context, digest, new VirtualActionInputBlob(virtualActionInput)); case ContentSource.PathSource(Path path) -> { try { if (remotePathChecker.isRemote(context, path)) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD index c04252a26ca373..f7f60e8e60428f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/common/BUILD @@ -56,7 +56,6 @@ java_library( "//src/main/java/com/google/devtools/build/lib/concurrent", "//src/main/java/com/google/devtools/build/lib/exec:spawn_input_expander", "//src/main/java/com/google/devtools/build/lib/exec:spawn_runner", - "//src/main/java/com/google/devtools/build/lib/profiler", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs:pathfragment", "//third_party:guava", diff --git a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java index 416c01c0f6dcbe..bacb90bf0e0159 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/common/RemoteCacheClient.java @@ -20,9 +20,9 @@ import build.bazel.remote.execution.v2.ServerCapabilities; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; -import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -113,9 +113,16 @@ ListenableFuture uploadActionResult( ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out); + /** + * A supplier for the data comprising a BLOB. + * + *

As blobs can be large and may need to be kept in memory, consumers should call {@link #get} + * as late as possible and close the blob as soon as they are done with it. + */ @FunctionalInterface - interface CloseableBlobSupplier extends SilentCloseable { - InputStream get(); + interface Blob extends Closeable { + /** Get an input stream for the blob's data. Can be called multiple times. */ + InputStream get() throws IOException; @Override default void close() {} @@ -126,14 +133,14 @@ default void close() {} * * @param context the context for the action. * @param digest The digest of the blob. - * @param data A supplier for the data to upload. May be called multiple times. + * @param blob A supplier for the blob to upload. May be called multiple times, but is closed by + * the implementation after the upload is complete. * @return A future representing pending completion of the upload. */ - ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data); + ListenableFuture uploadBlob(RemoteActionExecutionContext context, Digest digest, Blob blob); /** - * Uploads a {@code file} to the CAS. + * Uploads a {@code file} BLOB to the CAS. * * @param context the context for the action. * @param digest The digest of the file. @@ -146,7 +153,7 @@ default ListenableFuture uploadFile( } /** - * Uploads a BLOB to the CAS. + * Uploads an in-memory BLOB to the CAS. * * @param context the context for the action. * @param digest The digest of the blob. diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 644a38fc91ea82..37ca1bfc639024 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -718,12 +718,12 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier in) { + RemoteActionExecutionContext context, Digest digest, Blob blob) { return retrier.executeAsync( () -> { var result = - uploadAsync(digest.getHash(), digest.getSizeBytes(), in.get(), /* casUpload= */ true); - result.addListener(in::close, MoreExecutors.directExecutor()); + uploadAsync(digest.getHash(), digest.getSizeBytes(), blob.get(), /* casUpload= */ true); + result.addListener(blob::close, MoreExecutors.directExecutor()); return result; }); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java index 707c6bb8e5acf6..0457fe04573910 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java @@ -57,6 +57,7 @@ public class MerkleTree { private static final String BAZEL_TOOL_INPUT_MARKER = "bazel_tool_input"; + /** A source of a file's content */ public sealed interface ContentSource { record PathSource(Path path) implements ContentSource {} diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 0986a00ce2ec19..fc0c2d06c17b4a 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -44,7 +44,7 @@ import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -1815,7 +1815,7 @@ public void queryWriteStatus( /* Custom Chunker used to track number of open files */ private static class TestChunker extends Chunker { - TestChunker(CloseableBlobSupplier dataSupplier, long size, int chunkSize, boolean compressed) { + TestChunker(Blob dataSupplier, long size, int chunkSize, boolean compressed) { super(dataSupplier, size, chunkSize, compressed); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index 758717dfba6dd4..42c159cb562df3 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -136,7 +136,7 @@ public void resourcesShouldBeReleased() throws IOException { byte[] data = new byte[] {1, 2}; final AtomicReference in = new AtomicReference<>(); - RemoteCacheClient.CloseableBlobSupplier supplier = + RemoteCacheClient.Blob supplier = () -> { in.set(Mockito.spy(new ByteArrayInputStream(data))); return in.get(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java index 4157bb8bee9023..3d9fa052e1ddce 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/CombinedCacheTest.java @@ -55,7 +55,7 @@ import com.google.devtools.build.lib.remote.common.LostInputsEvent; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -74,7 +74,6 @@ import com.google.devtools.common.options.Options; import com.google.protobuf.ByteString; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.Deque; @@ -90,7 +89,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Supplier; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -400,7 +398,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -475,7 +473,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { SettableFuture future = SettableFuture.create(); @@ -556,7 +554,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl return future; }) .when(cacheProtocol) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer( invocationOnMock -> { Path file = invocationOnMock.getArgument(2, Path.class); @@ -655,7 +653,7 @@ public void ensureInputsPresent_uploadFailed_propagateErrors() throws Exception RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); doAnswer(invocationOnMock -> Futures.immediateFailedFuture(new IOException("upload failed"))) .when(cacheProtocol) .uploadFile(any(), any(), any()); diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index 6cc79b95098738..1836221831f2bb 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -101,7 +101,7 @@ import com.google.devtools.build.lib.remote.RemoteScrubbing.Config; import com.google.devtools.build.lib.remote.common.BulkTransferException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CloseableBlobSupplier; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient.Blob; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.common.RemotePathResolver; import com.google.devtools.build.lib.remote.common.RemotePathResolver.DefaultRemotePathResolver; @@ -2163,7 +2163,7 @@ public void uploadInputsIfNotPresent_interrupted_requestCancelled() throws Excep return future; }) .when(cache.remoteCacheClient) - .uploadBlob(any(), any(), (CloseableBlobSupplier) any()); + .uploadBlob(any(), any(), (Blob) any()); ActionInput input = ActionInputHelper.fromPath("inputs/foo"); fakeFileCache.createScratchInput(input, "input-foo"); RemoteExecutionService service = newRemoteExecutionService(); diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java index 01a3870d69cfed..466fa953e64f5f 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java +++ b/src/test/java/com/google/devtools/build/lib/remote/util/InMemoryCacheClient.java @@ -141,9 +141,9 @@ public ListenableFuture uploadActionResult( @Override public ListenableFuture uploadBlob( - RemoteActionExecutionContext context, Digest digest, CloseableBlobSupplier data) { - try { - cas.put(digest, data.get().readAllBytes()); + RemoteActionExecutionContext context, Digest digest, Blob blob) { + try (blob) { + cas.put(digest, blob.get().readAllBytes()); } catch (IOException e) { return Futures.immediateFailedFuture(e); } diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java index 15080fa9280d3f..dc66ee31ec9d25 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ByteStreamServer.java @@ -85,11 +85,11 @@ public void read(ReadRequest request, StreamObserver responseObser // This still relies on the blob size to be small enough to fit in memory. // TODO(olaola): refactor to fix this if the need arises. byte[] bytes = getFromFuture(cache.downloadBlob(context, digest)); - Chunker c = - Chunker.builder().setInput(bytes.length, () -> new ByteArrayInputStream(bytes)).build(); - while (c.hasNext()) { - responseObserver.onNext( - ReadResponse.newBuilder().setData(c.next().getData()).build()); + try (Chunker c = + Chunker.builder().setInput(bytes.length, () -> new ByteArrayInputStream(bytes)).build()) { + while (c.hasNext()) { + responseObserver.onNext(ReadResponse.newBuilder().setData(c.next().getData()).build()); + } } responseObserver.onCompleted(); } catch (CacheNotFoundException e) { From 6914c1f8981b43f1b9468c50046d0e0d6bb0d8d9 Mon Sep 17 00:00:00 2001 From: Fabian Meumertzheim Date: Wed, 15 Jan 2025 07:40:49 +0100 Subject: [PATCH 13/13] Fix javadocs --- .../devtools/build/lib/remote/merkletree/MerkleTree.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java index 0457fe04573910..db6f3b90c276e4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java +++ b/src/main/java/com/google/devtools/build/lib/remote/merkletree/MerkleTree.java @@ -57,10 +57,12 @@ public class MerkleTree { private static final String BAZEL_TOOL_INPUT_MARKER = "bazel_tool_input"; - /** A source of a file's content */ + /** A source of a file's content. */ public sealed interface ContentSource { + /** Content provided by an actual file. */ record PathSource(Path path) implements ContentSource {} + /** Content provided by a virtual action input. */ record VirtualActionInputSource(VirtualActionInput virtualActionInput) implements ContentSource {} }