From 5db8642224e996d0ebe3492cd55dfef6276d5a15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Mon, 21 Mar 2022 16:02:51 +0100 Subject: [PATCH 01/15] Factor out a struct representing a connection to a machine --- src/hydra-queue-runner/state.hh | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 8f303d28a..059b03a14 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -290,6 +290,16 @@ struct Machine { return sshName == "localhost"; } + + // A connection to a machine + struct Connection { + nix::FdSink to; + nix::FdSource from; + unsigned int remoteVersion; + + // Backpointer to the machine + ptr machine; + }; }; From 2f494b783425d6703f23bc4f2cdbc70592a31990 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Mon, 21 Mar 2022 10:42:44 +0100 Subject: [PATCH 02/15] Factor out the creation of the log file --- src/hydra-queue-runner/build-remote.cc | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 464a35c86..2e258484d 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -175,6 +175,18 @@ StorePaths reverseTopoSortPaths(const std::map & paths return sorted; } +std::pair openLogFile(const std::string & logDir, const StorePath & drvPath) +{ + string base(drvPath.to_string()); + auto logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2); + + createDirs(dirOf(logFile)); + + AutoCloseFD logFD = open(logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666); + if (!logFD) throw SysError("creating log file ‘%s’", logFile); + + return {std::move(logFile), std::move(logFD)}; +} void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, @@ -185,14 +197,9 @@ void State::buildRemote(ref destStore, { assert(BuildResult::TimedOut == 8); - string base(step->drvPath.to_string()); - result.logFile = logDir + "/" + string(base, 0, 2) + "/" + string(base, 2); - AutoDelete autoDelete(result.logFile, false); - - createDirs(dirOf(result.logFile)); - - AutoCloseFD logFD = open(result.logFile.c_str(), O_CREAT | O_TRUNC | O_WRONLY, 0666); - if (!logFD) throw SysError("creating log file ‘%s’", result.logFile); + auto [logFile, logFD] = openLogFile(logDir, step->drvPath); + AutoDelete logFileDel(logFile, false); + result.logFile = logFile; nix::Path tmpDir = createTempDir(); AutoDelete tmpDirDel(tmpDir, true); @@ -316,7 +323,7 @@ void State::buildRemote(ref destStore, result.overhead += std::chrono::duration_cast(now2 - now1).count(); } - autoDelete.cancel(); + logFileDel.cancel(); /* Truncate the log to get rid of messages about substitutions etc. on the remote system. */ From 9f1b911625cbc92023b6aec936a505a46af3172f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Mon, 21 Mar 2022 11:35:38 +0100 Subject: [PATCH 03/15] Factor more stuff out --- src/hydra-queue-runner/build-remote.cc | 224 ++++++++++++++----------- 1 file changed, 122 insertions(+), 102 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 2e258484d..360a8ef7a 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -188,6 +188,87 @@ std::pair openLogFile(const std::string & logDir, const Store return {std::move(logFile), std::move(logFD)}; } +void handshake(Machine::Connection & conn, unsigned int repeats) +{ + conn.to << SERVE_MAGIC_1 << 0x204; + conn.to.flush(); + + unsigned int magic = readInt(conn.from); + if (magic != SERVE_MAGIC_2) + throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", conn.machine->sshName); + conn.remoteVersion = readInt(conn.from); + if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200) + throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", conn.machine->sshName); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0) + throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName); +} + +StorePathSet sendInputs( + State & state, + Step & step, + Store & localStore, + Store & destStore, + Machine::Connection & conn, + unsigned int & overhead, + counter & nrStepsWaiting, + counter & nrStepsCopyingTo +) +{ + + StorePathSet inputs; + BasicDerivation basicDrv(*step.drv); + + for (auto & p : step.drv->inputSrcs) + inputs.insert(p); + + for (auto & input : step.drv->inputDrvs) { + auto drv2 = localStore.readDerivation(input.first); + for (auto & name : input.second) { + if (auto i = get(drv2.outputs, name)) { + auto outPath = i->path(localStore, drv2.name, name); + inputs.insert(*outPath); + basicDrv.inputSrcs.insert(*outPath); + } + } + } + + /* Ensure that the inputs exist in the destination store. This is + a no-op for regular stores, but for the binary cache store, + this will copy the inputs to the binary cache from the local + store. */ + if (localStore.getUri() != destStore.getUri()) { + StorePathSet closure; + localStore.computeFSClosure(step.drv->inputSrcs, closure); + copyPaths(localStore, destStore, closure, NoRepair, NoCheckSigs, NoSubstitute); + } + + { + auto mc1 = std::make_shared>(nrStepsWaiting); + mc1.reset(); + MaintainCount mc2(nrStepsCopyingTo); + + printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’", + localStore.printStorePath(step.drvPath), conn.machine->sshName); + + auto now1 = std::chrono::steady_clock::now(); + + /* Copy the input closure. */ + if (conn.machine->isLocalhost()) { + StorePathSet closure; + destStore.computeFSClosure(inputs, closure); + copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); + } else { + copyClosureTo(conn.machine->state->sendLock, destStore, conn.from, conn.to, inputs, true); + } + + auto now2 = std::chrono::steady_clock::now(); + + overhead += std::chrono::duration_cast(now2 - now1).count(); + } + + return inputs; +} + void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, @@ -230,33 +311,21 @@ void State::buildRemote(ref destStore, process. Meh. */ }); - FdSource from(child.from.get()); - FdSink to(child.to.get()); + Machine::Connection conn; + conn.from = child.from.get(); + conn.to = child.to.get(); + conn.machine = machine; Finally updateStats([&]() { - bytesReceived += from.read; - bytesSent += to.written; + bytesReceived += conn.from.read; + bytesSent += conn.to.written; }); - /* Handshake. */ - unsigned int remoteVersion; - try { - to << SERVE_MAGIC_1 << 0x204; - to.flush(); - - unsigned int magic = readInt(from); - if (magic != SERVE_MAGIC_2) - throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", machine->sshName); - remoteVersion = readInt(from); - if (GET_PROTOCOL_MAJOR(remoteVersion) != 0x200) - throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", machine->sshName); - if (GET_PROTOCOL_MINOR(remoteVersion) < 3 && repeats > 0) - throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", machine->sshName); - + handshake(conn, repeats); } catch (EndOfFile & e) { child.pid.wait(); - string s = chomp(readFile(result.logFile)); + std::string s = chomp(readFile(result.logFile)); throw Error("cannot connect to ‘%1%’: %2%", machine->sshName, s); } @@ -272,61 +341,12 @@ void State::buildRemote(ref destStore, outputs of the input derivations. */ updateStep(ssSendingInputs); - StorePathSet inputs; - BasicDerivation basicDrv(*step->drv); - - for (auto & p : step->drv->inputSrcs) - inputs.insert(p); - - for (auto & input : step->drv->inputDrvs) { - auto drv2 = localStore->readDerivation(input.first); - for (auto & name : input.second) { - if (auto i = get(drv2.outputs, name)) { - auto outPath = i->path(*localStore, drv2.name, name); - inputs.insert(*outPath); - basicDrv.inputSrcs.insert(*outPath); - } - } - } - - /* Ensure that the inputs exist in the destination store. This is - a no-op for regular stores, but for the binary cache store, - this will copy the inputs to the binary cache from the local - store. */ - if (localStore != std::shared_ptr(destStore)) { - StorePathSet closure; - localStore->computeFSClosure(step->drv->inputSrcs, closure); - copyPaths(*localStore, *destStore, closure, NoRepair, NoCheckSigs, NoSubstitute); - } - - { - auto mc1 = std::make_shared>(nrStepsWaiting); - mc1.reset(); - MaintainCount mc2(nrStepsCopyingTo); - - printMsg(lvlDebug, "sending closure of ‘%s’ to ‘%s’", - localStore->printStorePath(step->drvPath), machine->sshName); - - auto now1 = std::chrono::steady_clock::now(); - - /* Copy the input closure. */ - if (machine->isLocalhost()) { - StorePathSet closure; - destStore->computeFSClosure(inputs, closure); - copyPaths(*destStore, *localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); - } else { - copyClosureTo(machine->state->sendLock, *destStore, from, to, inputs, true); - } - - auto now2 = std::chrono::steady_clock::now(); - - result.overhead += std::chrono::duration_cast(now2 - now1).count(); - } + StorePathSet inputs = sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo); logFileDel.cancel(); /* Truncate the log to get rid of messages about substitutions - etc. on the remote system. */ + etc. on the remote system. */ if (lseek(logFD.get(), SEEK_SET, 0) != 0) throw SysError("seeking to the start of log file ‘%s’", result.logFile); @@ -342,31 +362,31 @@ void State::buildRemote(ref destStore, updateStep(ssBuilding); - to << cmdBuildDerivation << localStore->printStorePath(step->drvPath); - writeDerivation(to, *localStore, basicDrv); - to << maxSilentTime << buildTimeout; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 2) - to << maxLogSize; - if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { - to << repeats // == build-repeat + conn.to << cmdBuildDerivation << localStore->printStorePath(step->drvPath); + writeDerivation(conn.to, *localStore, BasicDerivation(*step->drv)); + conn.to << maxSilentTime << buildTimeout; + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2) + conn.to << maxLogSize; + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { + conn.to << repeats // == build-repeat << step->isDeterministic; // == enforce-determinism } - to.flush(); + conn.to.flush(); result.startTime = time(0); int res; { MaintainCount mc(nrStepsBuilding); - res = readInt(from); + res = readInt(conn.from); } result.stopTime = time(0); - result.errorMsg = readString(from); - if (GET_PROTOCOL_MINOR(remoteVersion) >= 3) { - result.timesBuilt = readInt(from); - result.isNonDeterministic = readInt(from); - auto start = readInt(from); - auto stop = readInt(from); + result.errorMsg = readString(conn.from); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { + result.timesBuilt = readInt(conn.from); + result.isNonDeterministic = readInt(conn.from); + auto start = readInt(conn.from); + auto stop = readInt(conn.from); if (start && start) { /* Note: this represents the duration of a single round, rather than all rounds. */ @@ -374,8 +394,8 @@ void State::buildRemote(ref destStore, result.stopTime = stop; } } - if (GET_PROTOCOL_MINOR(remoteVersion) >= 6) { - worker_proto::read(*localStore, from, Phantom {}); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) { + worker_proto::read(*localStore, conn.from, Phantom {}); } switch ((BuildResult::Status) res) { case BuildResult::Built: @@ -451,19 +471,19 @@ void State::buildRemote(ref destStore, /* Get info about each output path. */ std::map infos; size_t totalNarSize = 0; - to << cmdQueryPathInfos; - worker_proto::write(*localStore, to, outputs); - to.flush(); + conn.to << cmdQueryPathInfos; + worker_proto::write(*localStore, conn.to, outputs); + conn.to.flush(); while (true) { - auto storePathS = readString(from); + auto storePathS = readString(conn.from); if (storePathS == "") break; - auto deriver = readString(from); // deriver - auto references = worker_proto::read(*localStore, from, Phantom {}); - readLongLong(from); // download size - auto narSize = readLongLong(from); - auto narHash = Hash::parseAny(readString(from), htSHA256); - auto ca = parseContentAddressOpt(readString(from)); - readStrings(from); // sigs + auto deriver = readString(conn.from); // deriver + auto references = worker_proto::read(*localStore, conn.from, Phantom {}); + readLongLong(conn.from); // download size + auto narSize = readLongLong(conn.from); + auto narHash = Hash::parseAny(readString(conn.from), htSHA256); + auto ca = parseContentAddressOpt(readString(conn.from)); + readStrings(conn.from); // sigs ValidPathInfo info(localStore->parseStorePath(storePathS), narHash); assert(outputs.count(info.path)); info.references = references; @@ -502,10 +522,10 @@ void State::buildRemote(ref destStore, lambda function only gets executed if someone tries to read from source2, we will send the command from here rather than outside the lambda. */ - to << cmdDumpStorePath << localStore->printStorePath(path); - to.flush(); + conn.to << cmdDumpStorePath << localStore->printStorePath(path); + conn.to.flush(); - TeeSource tee(from, sink); + TeeSource tee(conn.from, sink); extractNarData(tee, localStore->printStorePath(path), narMembers); }); From 365776f5d77112842d4a094a596cb725123b535d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Mon, 21 Mar 2022 12:14:37 +0100 Subject: [PATCH 04/15] Factor out the building part --- src/hydra-queue-runner/build-remote.cc | 209 ++++++++++++++++--------- src/hydra-queue-runner/state.hh | 2 + 2 files changed, 133 insertions(+), 78 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 360a8ef7a..7934d4014 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -269,6 +269,121 @@ StorePathSet sendInputs( return inputs; } +struct BuildOptions { + unsigned int maxSilentTime, buildTimeout, repeats; + size_t maxLogSize; + bool enforceDeterminism; +}; + +void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) +{ + RemoteResult thisArrow; + + startTime = buildResult.startTime; + stopTime = buildResult.stopTime; + timesBuilt = buildResult.timesBuilt; + errorMsg = buildResult.errorMsg; + isNonDeterministic = buildResult.isNonDeterministic; + + switch ((BuildResult::Status) buildResult.status) { + case BuildResult::Built: + stepStatus = bsSuccess; + break; + case BuildResult::Substituted: + case BuildResult::AlreadyValid: + stepStatus = bsSuccess; + isCached = true; + break; + case BuildResult::PermanentFailure: + stepStatus = bsFailed; + canCache = true; + errorMsg = ""; + break; + case BuildResult::InputRejected: + case BuildResult::OutputRejected: + stepStatus = bsFailed; + canCache = true; + break; + case BuildResult::TransientFailure: + stepStatus = bsFailed; + canRetry = true; + errorMsg = ""; + break; + case BuildResult::TimedOut: + stepStatus = bsTimedOut; + errorMsg = ""; + break; + case BuildResult::MiscFailure: + stepStatus = bsAborted; + canRetry = true; + break; + case BuildResult::LogLimitExceeded: + stepStatus = bsLogLimitExceeded; + break; + case BuildResult::NotDeterministic: + stepStatus = bsNotDeterministic; + canRetry = false; + canCache = true; + break; + default: + stepStatus = bsAborted; + break; + } + +} + +BuildResult performBuild( + Machine::Connection & conn, + Store & localStore, + StorePath drvPath, + const BasicDerivation & drv, + const BuildOptions & options, + counter & nrStepsBuilding +) +{ + + BuildResult result; + + conn.to << cmdBuildDerivation << localStore.printStorePath(drvPath); + writeDerivation(conn.to, localStore, drv); + conn.to << options.maxSilentTime << options.buildTimeout; + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2) + conn.to << options.maxLogSize; + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { + conn.to << options.repeats // == build-repeat + << options.enforceDeterminism; + } + conn.to.flush(); + + result.startTime = time(0); + + { + MaintainCount mc(nrStepsBuilding); + result.status = (BuildResult::Status)readInt(conn.from); + } + result.stopTime = time(0); + + + result.errorMsg = readString(conn.from); + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { + result.timesBuilt = readInt(conn.from); + result.isNonDeterministic = readInt(conn.from); + auto start = readInt(conn.from); + auto stop = readInt(conn.from); + if (start && start) { + /* Note: this represents the duration of a single + round, rather than all rounds. */ + result.startTime = start; + result.stopTime = stop; + } + } + if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) { + result.builtOutputs = worker_proto::read(localStore, conn.from, Phantom {}); + } + + return result; +} + void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, @@ -362,85 +477,23 @@ void State::buildRemote(ref destStore, updateStep(ssBuilding); - conn.to << cmdBuildDerivation << localStore->printStorePath(step->drvPath); - writeDerivation(conn.to, *localStore, BasicDerivation(*step->drv)); - conn.to << maxSilentTime << buildTimeout; - if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 2) - conn.to << maxLogSize; - if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { - conn.to << repeats // == build-repeat - << step->isDeterministic; // == enforce-determinism - } - conn.to.flush(); + BuildResult buildResult = performBuild( + conn, + *localStore, + step->drvPath, + BasicDerivation(*step->drv), + { + .maxSilentTime = maxSilentTime, + .buildTimeout = buildTimeout, + .repeats = repeats, + .maxLogSize = maxLogSize, + .enforceDeterminism = step->isDeterministic, + }, + nrStepsBuilding + ); + + result.updateWithBuildResult(buildResult); - result.startTime = time(0); - int res; - { - MaintainCount mc(nrStepsBuilding); - res = readInt(conn.from); - } - result.stopTime = time(0); - - result.errorMsg = readString(conn.from); - if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 3) { - result.timesBuilt = readInt(conn.from); - result.isNonDeterministic = readInt(conn.from); - auto start = readInt(conn.from); - auto stop = readInt(conn.from); - if (start && start) { - /* Note: this represents the duration of a single - round, rather than all rounds. */ - result.startTime = start; - result.stopTime = stop; - } - } - if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) { - worker_proto::read(*localStore, conn.from, Phantom {}); - } - switch ((BuildResult::Status) res) { - case BuildResult::Built: - result.stepStatus = bsSuccess; - break; - case BuildResult::Substituted: - case BuildResult::AlreadyValid: - result.stepStatus = bsSuccess; - result.isCached = true; - break; - case BuildResult::PermanentFailure: - result.stepStatus = bsFailed; - result.canCache = true; - result.errorMsg = ""; - break; - case BuildResult::InputRejected: - case BuildResult::OutputRejected: - result.stepStatus = bsFailed; - result.canCache = true; - break; - case BuildResult::TransientFailure: - result.stepStatus = bsFailed; - result.canRetry = true; - result.errorMsg = ""; - break; - case BuildResult::TimedOut: - result.stepStatus = bsTimedOut; - result.errorMsg = ""; - break; - case BuildResult::MiscFailure: - result.stepStatus = bsAborted; - result.canRetry = true; - break; - case BuildResult::LogLimitExceeded: - result.stepStatus = bsLogLimitExceeded; - break; - case BuildResult::NotDeterministic: - result.stepStatus = bsNotDeterministic; - result.canRetry = false; - result.canCache = true; - break; - default: - result.stepStatus = bsAborted; - break; - } if (result.stepStatus != bsSuccess) return; result.errorMsg = ""; diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 059b03a14..6292a2dbb 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -72,6 +72,8 @@ struct RemoteResult { return stepStatus == bsCachedFailure ? bsFailed : stepStatus; } + + void updateWithBuildResult(const nix::BuildResult &); }; From a778a89f0424b5bffa66818cf0b46ffa945403c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Mon, 21 Mar 2022 15:16:32 +0100 Subject: [PATCH 05/15] Factor out the `queryPathInfos` part of the build --- src/hydra-queue-runner/build-remote.cc | 65 +++++++++++++++----------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 7934d4014..0451ccb74 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -384,6 +384,44 @@ BuildResult performBuild( return result; } +std::map queryPathInfos( + Machine::Connection & conn, + Store & localStore, + StorePathSet & outputs, + size_t & totalNarSize +) +{ + + /* Get info about each output path. */ + std::map infos; + conn.to << cmdQueryPathInfos; + worker_proto::write(localStore, conn.to, outputs); + conn.to.flush(); + while (true) { + auto storePathS = readString(conn.from); + if (storePathS == "") break; + auto deriver = readString(conn.from); // deriver + auto references = worker_proto::read(localStore, conn.from, Phantom {}); + readLongLong(conn.from); // download size + auto narSize = readLongLong(conn.from); + auto narHash = Hash::parseAny(readString(conn.from), htSHA256); + auto ca = parseContentAddressOpt(readString(conn.from)); + readStrings(conn.from); // sigs + ValidPathInfo info(localStore.parseStorePath(storePathS), narHash); + assert(outputs.count(info.path)); + info.references = references; + info.narSize = narSize; + totalNarSize += info.narSize; + info.narHash = narHash; + info.ca = ca; + if (deriver != "") + info.deriver = localStore.parseStorePath(deriver); + infos.insert_or_assign(info.path, info); + } + + return infos; +} + void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, @@ -521,33 +559,8 @@ void State::buildRemote(ref destStore, outputs.insert(*i.second.second); } - /* Get info about each output path. */ - std::map infos; size_t totalNarSize = 0; - conn.to << cmdQueryPathInfos; - worker_proto::write(*localStore, conn.to, outputs); - conn.to.flush(); - while (true) { - auto storePathS = readString(conn.from); - if (storePathS == "") break; - auto deriver = readString(conn.from); // deriver - auto references = worker_proto::read(*localStore, conn.from, Phantom {}); - readLongLong(conn.from); // download size - auto narSize = readLongLong(conn.from); - auto narHash = Hash::parseAny(readString(conn.from), htSHA256); - auto ca = parseContentAddressOpt(readString(conn.from)); - readStrings(conn.from); // sigs - ValidPathInfo info(localStore->parseStorePath(storePathS), narHash); - assert(outputs.count(info.path)); - info.references = references; - info.narSize = narSize; - totalNarSize += info.narSize; - info.narHash = narHash; - info.ca = ca; - if (deriver != "") - info.deriver = localStore->parseStorePath(deriver); - infos.insert_or_assign(info.path, info); - } + auto infos = queryPathInfos(conn, *localStore, outputs, totalNarSize); if (totalNarSize > maxOutputSize) { result.stepStatus = bsNarSizeLimitExceeded; From fd0ae78eba058ab456590da698b45082fcafb519 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Mon, 21 Mar 2022 15:26:31 +0100 Subject: [PATCH 06/15] Factor out the copying from the build store --- src/hydra-queue-runner/build-remote.cc | 76 +++++++++++++++++--------- 1 file changed, 49 insertions(+), 27 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 0451ccb74..79e5a2319 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -422,6 +422,54 @@ std::map queryPathInfos( return infos; } +void copyPathFromRemote( + Machine::Connection & conn, + NarMemberDatas & narMembers, + Store & localStore, + Store & destStore, + const ValidPathInfo & info +) +{ + /* Receive the NAR from the remote and add it to the + destination store. Meanwhile, extract all the info from the + NAR that getBuildOutput() needs. */ + auto source2 = sinkToSource([&](Sink & sink) + { + /* Note: we should only send the command to dump the store + path to the remote if the NAR is actually going to get read + by the destination store, which won't happen if this path + is already valid on the destination store. Since this + lambda function only gets executed if someone tries to read + from source2, we will send the command from here rather + than outside the lambda. */ + conn.to << cmdDumpStorePath << localStore.printStorePath(info.path); + conn.to.flush(); + + TeeSource tee(conn.from, sink); + extractNarData(tee, localStore.printStorePath(info.path), narMembers); + }); + + destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); +} + +void copyPathsFromRemote( + Machine::Connection & conn, + NarMemberDatas & narMembers, + Store & localStore, + Store & destStore, + const std::map & infos +) +{ + auto pathsSorted = reverseTopoSortPaths(infos); + + for (auto & path : pathsSorted) { + auto & info = infos.find(path)->second; + copyPathFromRemote(conn, narMembers, localStore, destStore, info); + } + +} + + void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, @@ -571,33 +619,7 @@ void State::buildRemote(ref destStore, printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)", localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); - auto pathsSorted = reverseTopoSortPaths(infos); - - for (auto & path : pathsSorted) { - auto & info = infos.find(path)->second; - - /* Receive the NAR from the remote and add it to the - destination store. Meanwhile, extract all the info from the - NAR that getBuildOutput() needs. */ - auto source2 = sinkToSource([&](Sink & sink) - { - /* Note: we should only send the command to dump the store - path to the remote if the NAR is actually going to get read - by the destination store, which won't happen if this path - is already valid on the destination store. Since this - lambda function only gets executed if someone tries to read - from source2, we will send the command from here rather - than outside the lambda. */ - conn.to << cmdDumpStorePath << localStore->printStorePath(path); - conn.to.flush(); - - TeeSource tee(conn.from, sink); - extractNarData(tee, localStore->printStorePath(path), narMembers); - }); - - destStore->addToStore(info, *source2, NoRepair, NoCheckSigs); - } - + copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos); auto now2 = std::chrono::steady_clock::now(); result.overhead += std::chrono::duration_cast(now2 - now1).count(); From b430d41afd6a4ca0e38343ae2eee4aa6307cf980 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Mon, 21 Mar 2022 16:33:25 +0100 Subject: [PATCH 07/15] Use the `BuildOptions` more eagerly --- src/hydra-queue-runner/build-remote.cc | 20 ++++---------------- src/hydra-queue-runner/builder.cc | 16 +++++++++------- src/hydra-queue-runner/state.hh | 9 +++++++-- 3 files changed, 20 insertions(+), 25 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 79e5a2319..1e06e5014 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -269,12 +269,6 @@ StorePathSet sendInputs( return inputs; } -struct BuildOptions { - unsigned int maxSilentTime, buildTimeout, repeats; - size_t maxLogSize; - bool enforceDeterminism; -}; - void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) { RemoteResult thisArrow; @@ -337,7 +331,7 @@ BuildResult performBuild( Store & localStore, StorePath drvPath, const BasicDerivation & drv, - const BuildOptions & options, + const State::BuildOptions & options, counter & nrStepsBuilding ) { @@ -472,7 +466,7 @@ void copyPathsFromRemote( void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, - unsigned int maxSilentTime, unsigned int buildTimeout, unsigned int repeats, + const BuildOptions & buildOptions, RemoteResult & result, std::shared_ptr activeStep, std::function updateStep, NarMemberDatas & narMembers) @@ -523,7 +517,7 @@ void State::buildRemote(ref destStore, }); try { - handshake(conn, repeats); + handshake(conn, buildOptions.repeats); } catch (EndOfFile & e) { child.pid.wait(); std::string s = chomp(readFile(result.logFile)); @@ -568,13 +562,7 @@ void State::buildRemote(ref destStore, *localStore, step->drvPath, BasicDerivation(*step->drv), - { - .maxSilentTime = maxSilentTime, - .buildTimeout = buildTimeout, - .repeats = repeats, - .maxLogSize = maxLogSize, - .enforceDeterminism = step->isDeterministic, - }, + buildOptions, nrStepsBuilding ); diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 89aa7d15d..b25b4e631 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -98,8 +98,10 @@ State::StepResult State::doBuildStep(nix::ref destStore, it). */ BuildID buildId; std::optional buildDrvPath; - unsigned int maxSilentTime, buildTimeout; - unsigned int repeats = step->isDeterministic ? 1 : 0; + BuildOptions buildOptions; + buildOptions.repeats = step->isDeterministic ? 1 : 0; + buildOptions.maxLogSize = maxLogSize; + buildOptions.enforceDeterminism = step->isDeterministic; auto conn(dbPool.get()); @@ -134,18 +136,18 @@ State::StepResult State::doBuildStep(nix::ref destStore, { auto i = jobsetRepeats.find(std::make_pair(build2->projectName, build2->jobsetName)); if (i != jobsetRepeats.end()) - repeats = std::max(repeats, i->second); + buildOptions.repeats = std::max(buildOptions.repeats, i->second); } } if (!build) build = *dependents.begin(); buildId = build->id; buildDrvPath = build->drvPath; - maxSilentTime = build->maxSilentTime; - buildTimeout = build->buildTimeout; + buildOptions.maxSilentTime = build->maxSilentTime; + buildOptions.buildTimeout = build->buildTimeout; printInfo("performing step ‘%s’ %d times on ‘%s’ (needed by build %d and %d others)", - localStore->printStorePath(step->drvPath), repeats + 1, machine->sshName, buildId, (dependents.size() - 1)); + localStore->printStorePath(step->drvPath), buildOptions.repeats + 1, machine->sshName, buildId, (dependents.size() - 1)); } if (!buildOneDone) @@ -206,7 +208,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, try { /* FIXME: referring builds may have conflicting timeouts. */ - buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, repeats, result, activeStep, updateStep, narMembers); + buildRemote(destStore, machine, step, buildOptions, result, activeStep, updateStep, narMembers); } catch (Error & e) { if (activeStep->state_.lock()->cancelled) { printInfo("marking step %d of build %d as cancelled", stepNr, buildId); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 6292a2dbb..f4d8ccce3 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -447,6 +447,12 @@ private: public: State(); + struct BuildOptions { + unsigned int maxSilentTime, buildTimeout, repeats; + size_t maxLogSize; + bool enforceDeterminism; + }; + private: nix::MaintainCount startDbUpdate(); @@ -531,8 +537,7 @@ private: void buildRemote(nix::ref destStore, Machine::ptr machine, Step::ptr step, - unsigned int maxSilentTime, unsigned int buildTimeout, - unsigned int repeats, + const BuildOptions & buildOptions, RemoteResult & result, std::shared_ptr activeStep, std::function updateStep, NarMemberDatas & narMembers); From 92b627ac1b3e0f53f6b35fc5940406c12fa977da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= <7226587+thufschmitt@users.noreply.github.com> Date: Thu, 24 Mar 2022 09:39:24 +0100 Subject: [PATCH 08/15] Remove an accidental re-indenting of a comment Co-authored-by: Eelco Dolstra --- src/hydra-queue-runner/build-remote.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 1e06e5014..901bbc897 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -233,9 +233,9 @@ StorePathSet sendInputs( } /* Ensure that the inputs exist in the destination store. This is - a no-op for regular stores, but for the binary cache store, - this will copy the inputs to the binary cache from the local - store. */ + a no-op for regular stores, but for the binary cache store, + this will copy the inputs to the binary cache from the local + store. */ if (localStore.getUri() != destStore.getUri()) { StorePathSet closure; localStore.computeFSClosure(step.drv->inputSrcs, closure); From 6e571e26ff068386bc949eaab6646e90613b23c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Thu, 24 Mar 2022 14:27:45 +0100 Subject: [PATCH 09/15] Build the resolved derivation and not the original one --- src/hydra-queue-runner/build-remote.cc | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 901bbc897..62461a651 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -203,7 +203,7 @@ void handshake(Machine::Connection & conn, unsigned int repeats) throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName); } -StorePathSet sendInputs( +BasicDerivation sendInputs( State & state, Step & step, Store & localStore, @@ -214,19 +214,13 @@ StorePathSet sendInputs( counter & nrStepsCopyingTo ) { - - StorePathSet inputs; BasicDerivation basicDrv(*step.drv); - for (auto & p : step.drv->inputSrcs) - inputs.insert(p); - for (auto & input : step.drv->inputDrvs) { auto drv2 = localStore.readDerivation(input.first); for (auto & name : input.second) { if (auto i = get(drv2.outputs, name)) { auto outPath = i->path(localStore, drv2.name, name); - inputs.insert(*outPath); basicDrv.inputSrcs.insert(*outPath); } } @@ -255,10 +249,10 @@ StorePathSet sendInputs( /* Copy the input closure. */ if (conn.machine->isLocalhost()) { StorePathSet closure; - destStore.computeFSClosure(inputs, closure); + destStore.computeFSClosure(basicDrv.inputSrcs, closure); copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); } else { - copyClosureTo(conn.machine->state->sendLock, destStore, conn.from, conn.to, inputs, true); + copyClosureTo(conn.machine->state->sendLock, destStore, conn.from, conn.to, basicDrv.inputSrcs, true); } auto now2 = std::chrono::steady_clock::now(); @@ -266,7 +260,7 @@ StorePathSet sendInputs( overhead += std::chrono::duration_cast(now2 - now1).count(); } - return inputs; + return basicDrv; } void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) @@ -535,8 +529,7 @@ void State::buildRemote(ref destStore, copy the immediate sources of the derivation and the required outputs of the input derivations. */ updateStep(ssSendingInputs); - - StorePathSet inputs = sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo); + BasicDerivation resolvedDrv = sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo); logFileDel.cancel(); @@ -561,7 +554,7 @@ void State::buildRemote(ref destStore, conn, *localStore, step->drvPath, - BasicDerivation(*step->drv), + resolvedDrv, buildOptions, nrStepsBuilding ); From 143c31734fc19d72a3ccfb7c064d829af9bfca72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophane=20Hufschmitt?= Date: Tue, 25 Oct 2022 10:04:29 +0200 Subject: [PATCH 10/15] Move all the build remote utils to their namespace Just don't pollute the global one --- src/hydra-queue-runner/build-remote.cc | 134 +++++++++++++------------ 1 file changed, 70 insertions(+), 64 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 62461a651..a429467bf 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -27,6 +27,8 @@ static void append(Strings & dst, const Strings & src) dst.insert(dst.end(), src.begin(), src.end()); } +namespace nix::build_remote { + static Strings extraStoreArgs(std::string & machine) { Strings result; @@ -263,63 +265,6 @@ BasicDerivation sendInputs( return basicDrv; } -void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) -{ - RemoteResult thisArrow; - - startTime = buildResult.startTime; - stopTime = buildResult.stopTime; - timesBuilt = buildResult.timesBuilt; - errorMsg = buildResult.errorMsg; - isNonDeterministic = buildResult.isNonDeterministic; - - switch ((BuildResult::Status) buildResult.status) { - case BuildResult::Built: - stepStatus = bsSuccess; - break; - case BuildResult::Substituted: - case BuildResult::AlreadyValid: - stepStatus = bsSuccess; - isCached = true; - break; - case BuildResult::PermanentFailure: - stepStatus = bsFailed; - canCache = true; - errorMsg = ""; - break; - case BuildResult::InputRejected: - case BuildResult::OutputRejected: - stepStatus = bsFailed; - canCache = true; - break; - case BuildResult::TransientFailure: - stepStatus = bsFailed; - canRetry = true; - errorMsg = ""; - break; - case BuildResult::TimedOut: - stepStatus = bsTimedOut; - errorMsg = ""; - break; - case BuildResult::MiscFailure: - stepStatus = bsAborted; - canRetry = true; - break; - case BuildResult::LogLimitExceeded: - stepStatus = bsLogLimitExceeded; - break; - case BuildResult::NotDeterministic: - stepStatus = bsNotDeterministic; - canRetry = false; - canCache = true; - break; - default: - stepStatus = bsAborted; - break; - } - -} - BuildResult performBuild( Machine::Connection & conn, Store & localStore, @@ -457,6 +402,67 @@ void copyPathsFromRemote( } +} + +/* using namespace nix::build_remote; */ + +void RemoteResult::updateWithBuildResult(const nix::BuildResult & buildResult) +{ + RemoteResult thisArrow; + + startTime = buildResult.startTime; + stopTime = buildResult.stopTime; + timesBuilt = buildResult.timesBuilt; + errorMsg = buildResult.errorMsg; + isNonDeterministic = buildResult.isNonDeterministic; + + switch ((BuildResult::Status) buildResult.status) { + case BuildResult::Built: + stepStatus = bsSuccess; + break; + case BuildResult::Substituted: + case BuildResult::AlreadyValid: + stepStatus = bsSuccess; + isCached = true; + break; + case BuildResult::PermanentFailure: + stepStatus = bsFailed; + canCache = true; + errorMsg = ""; + break; + case BuildResult::InputRejected: + case BuildResult::OutputRejected: + stepStatus = bsFailed; + canCache = true; + break; + case BuildResult::TransientFailure: + stepStatus = bsFailed; + canRetry = true; + errorMsg = ""; + break; + case BuildResult::TimedOut: + stepStatus = bsTimedOut; + errorMsg = ""; + break; + case BuildResult::MiscFailure: + stepStatus = bsAborted; + canRetry = true; + break; + case BuildResult::LogLimitExceeded: + stepStatus = bsLogLimitExceeded; + break; + case BuildResult::NotDeterministic: + stepStatus = bsNotDeterministic; + canRetry = false; + canCache = true; + break; + default: + stepStatus = bsAborted; + break; + } + +} + void State::buildRemote(ref destStore, Machine::ptr machine, Step::ptr step, @@ -467,7 +473,7 @@ void State::buildRemote(ref destStore, { assert(BuildResult::TimedOut == 8); - auto [logFile, logFD] = openLogFile(logDir, step->drvPath); + auto [logFile, logFD] = build_remote::openLogFile(logDir, step->drvPath); AutoDelete logFileDel(logFile, false); result.logFile = logFile; @@ -480,7 +486,7 @@ void State::buildRemote(ref destStore, // FIXME: rewrite to use Store. Child child; - openConnection(machine, tmpDir, logFD.get(), child); + build_remote::openConnection(machine, tmpDir, logFD.get(), child); { auto activeStepState(activeStep->state_.lock()); @@ -511,7 +517,7 @@ void State::buildRemote(ref destStore, }); try { - handshake(conn, buildOptions.repeats); + build_remote::handshake(conn, buildOptions.repeats); } catch (EndOfFile & e) { child.pid.wait(); std::string s = chomp(readFile(result.logFile)); @@ -529,7 +535,7 @@ void State::buildRemote(ref destStore, copy the immediate sources of the derivation and the required outputs of the input derivations. */ updateStep(ssSendingInputs); - BasicDerivation resolvedDrv = sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo); + BasicDerivation resolvedDrv = build_remote::sendInputs(*this, *step, *localStore, *destStore, conn, result.overhead, nrStepsWaiting, nrStepsCopyingTo); logFileDel.cancel(); @@ -550,7 +556,7 @@ void State::buildRemote(ref destStore, updateStep(ssBuilding); - BuildResult buildResult = performBuild( + BuildResult buildResult = build_remote::performBuild( conn, *localStore, step->drvPath, @@ -589,7 +595,7 @@ void State::buildRemote(ref destStore, } size_t totalNarSize = 0; - auto infos = queryPathInfos(conn, *localStore, outputs, totalNarSize); + auto infos = build_remote::queryPathInfos(conn, *localStore, outputs, totalNarSize); if (totalNarSize > maxOutputSize) { result.stepStatus = bsNarSizeLimitExceeded; @@ -600,7 +606,7 @@ void State::buildRemote(ref destStore, printMsg(lvlDebug, "copying outputs of ‘%s’ from ‘%s’ (%d bytes)", localStore->printStorePath(step->drvPath), machine->sshName, totalNarSize); - copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos); + build_remote::copyPathsFromRemote(conn, narMembers, *localStore, *destStore, infos); auto now2 = std::chrono::steady_clock::now(); result.overhead += std::chrono::duration_cast(now2 - now1).count(); From 2bda7ca642528e1a7c7188d72460fe90c89c4ebf Mon Sep 17 00:00:00 2001 From: John Ericson Date: Thu, 30 Nov 2023 11:31:58 -0500 Subject: [PATCH 11/15] Further use `Machine::Connection` to deduplicate --- src/hydra-queue-runner/build-remote.cc | 28 +++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index a82a95e60..8f8d65326 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -107,27 +107,27 @@ static void openConnection(Machine::ptr machine, Path tmpDir, int stderrFD, Chil } -static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, - FdSource & from, FdSink & to, const StorePathSet & paths, +static void copyClosureTo( + Machine::Connection & conn, + Store & destStore, + const StorePathSet & paths, bool useSubstitutes = false) { StorePathSet closure; destStore.computeFSClosure(paths, closure); - WorkerProto::WriteConn wconn { .to = to }; - WorkerProto::ReadConn rconn { .from = from }; /* Send the "query valid paths" command with the "lock" option enabled. This prevents a race where the remote host garbage-collect paths that are already there. Optionally, ask the remote host to substitute missing paths. */ // FIXME: substitute output pollutes our build log - to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; - WorkerProto::write(destStore, wconn, closure); - to.flush(); + conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; + WorkerProto::write(destStore, conn, closure); + conn.to.flush(); /* Get back the set of paths that are already valid on the remote host. */ - auto present = WorkerProto::Serialise::read(destStore, rconn); + auto present = WorkerProto::Serialise::read(destStore, conn); if (present.size() == closure.size()) return; @@ -139,14 +139,14 @@ static void copyClosureTo(std::timed_mutex & sendMutex, Store & destStore, printMsg(lvlDebug, "sending %d missing paths", missing.size()); - std::unique_lock sendLock(sendMutex, + std::unique_lock sendLock(conn.machine->state->sendLock, std::chrono::seconds(600)); - to << ServeProto::Command::ImportPaths; - destStore.exportPaths(missing, to); - to.flush(); + conn.to << ServeProto::Command::ImportPaths; + destStore.exportPaths(missing, conn.to); + conn.to.flush(); - if (readInt(from) != 1) + if (readInt(conn.from) != 1) throw Error("remote machine failed to import closure"); } @@ -257,7 +257,7 @@ BasicDerivation sendInputs( destStore.computeFSClosure(basicDrv.inputSrcs, closure); copyPaths(destStore, localStore, closure, NoRepair, NoCheckSigs, NoSubstitute); } else { - copyClosureTo(conn.machine->state->sendLock, destStore, conn.from, conn.to, basicDrv.inputSrcs, true); + copyClosureTo(conn, destStore, basicDrv.inputSrcs, true); } auto now2 = std::chrono::steady_clock::now(); From 0917145622c521fff5c9af1b26053c2217bbd02b Mon Sep 17 00:00:00 2001 From: John Ericson Date: Thu, 30 Nov 2023 12:19:05 -0500 Subject: [PATCH 12/15] Make new functions not in header `static` --- src/hydra-queue-runner/build-remote.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 8f8d65326..a58920167 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -152,7 +152,7 @@ static void copyClosureTo( // FIXME: use Store::topoSortPaths(). -StorePaths reverseTopoSortPaths(const std::map & paths) +static StorePaths reverseTopoSortPaths(const std::map & paths) { StorePaths sorted; StorePathSet visited; @@ -180,7 +180,7 @@ StorePaths reverseTopoSortPaths(const std::map & paths return sorted; } -std::pair openLogFile(const std::string & logDir, const StorePath & drvPath) +static std::pair openLogFile(const std::string & logDir, const StorePath & drvPath) { std::string base(drvPath.to_string()); auto logFile = logDir + "/" + std::string(base, 0, 2) + "/" + std::string(base, 2); @@ -193,7 +193,7 @@ std::pair openLogFile(const std::string & logDir, const Store return {std::move(logFile), std::move(logFD)}; } -void handshake(Machine::Connection & conn, unsigned int repeats) +static void handshake(Machine::Connection & conn, unsigned int repeats) { conn.to << SERVE_MAGIC_1 << 0x206; conn.to.flush(); @@ -208,7 +208,7 @@ void handshake(Machine::Connection & conn, unsigned int repeats) throw Error("machine ‘%1%’ does not support repeating a build; please upgrade it to Nix 1.12", conn.machine->sshName); } -BasicDerivation sendInputs( +static BasicDerivation sendInputs( State & state, Step & step, Store & localStore, @@ -268,7 +268,7 @@ BasicDerivation sendInputs( return basicDrv; } -BuildResult performBuild( +static BuildResult performBuild( Machine::Connection & conn, Store & localStore, StorePath drvPath, @@ -321,7 +321,7 @@ BuildResult performBuild( return result; } -std::map queryPathInfos( +static std::map queryPathInfos( Machine::Connection & conn, Store & localStore, StorePathSet & outputs, @@ -359,7 +359,7 @@ std::map queryPathInfos( return infos; } -void copyPathFromRemote( +static void copyPathFromRemote( Machine::Connection & conn, NarMemberDatas & narMembers, Store & localStore, @@ -389,7 +389,7 @@ void copyPathFromRemote( destStore.addToStore(info, *source2, NoRepair, NoCheckSigs); } -void copyPathsFromRemote( +static void copyPathsFromRemote( Machine::Connection & conn, NarMemberDatas & narMembers, Store & localStore, From e172461e550676a9a295cf917f9b6fab623af3e6 Mon Sep 17 00:00:00 2001 From: John Ericson Date: Thu, 30 Nov 2023 12:19:20 -0500 Subject: [PATCH 13/15] Use `const` in for loop As requested by @teh --- src/hydra-queue-runner/build-remote.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index a58920167..ca508b8b8 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -221,7 +221,7 @@ static BasicDerivation sendInputs( { BasicDerivation basicDrv(*step.drv); - for (auto & input : step.drv->inputDrvs) { + for (const auto & input : step.drv->inputDrvs) { auto drv2 = localStore.readDerivation(input.first); for (auto & name : input.second) { if (auto i = get(drv2.outputs, name)) { From 622c25e3c4da9a6f4fd44bfe425137597b81705a Mon Sep 17 00:00:00 2001 From: John Ericson Date: Mon, 4 Dec 2023 08:56:06 -0500 Subject: [PATCH 14/15] Sedding prior to merge --- src/hydra-queue-runner/build-remote.cc | 14 +++++++------- src/hydra-queue-runner/state.hh | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index ca508b8b8..f679db0c9 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -10,8 +10,8 @@ #include "serve-protocol.hh" #include "state.hh" #include "util.hh" -#include "worker-protocol.hh" -#include "worker-protocol-impl.hh" +#include "serve-protocol.hh" +#include "serve-protocol-impl.hh" #include "finally.hh" #include "url.hh" @@ -122,12 +122,12 @@ static void copyClosureTo( the remote host to substitute missing paths. */ // FIXME: substitute output pollutes our build log conn.to << ServeProto::Command::QueryValidPaths << 1 << useSubstitutes; - WorkerProto::write(destStore, conn, closure); + ServeProto::write(destStore, conn, closure); conn.to.flush(); /* Get back the set of paths that are already valid on the remote host. */ - auto present = WorkerProto::Serialise::read(destStore, conn); + auto present = ServeProto::Serialise::read(destStore, conn); if (present.size() == closure.size()) return; @@ -315,7 +315,7 @@ static BuildResult performBuild( } } if (GET_PROTOCOL_MINOR(conn.remoteVersion) >= 6) { - WorkerProto::Serialise::read(localStore, conn); + ServeProto::Serialise::read(localStore, conn); } return result; @@ -332,13 +332,13 @@ static std::map queryPathInfos( /* Get info about each output path. */ std::map infos; conn.to << ServeProto::Command::QueryPathInfos; - WorkerProto::write(localStore, conn, outputs); + ServeProto::write(localStore, conn, outputs); conn.to.flush(); while (true) { auto storePathS = readString(conn.from); if (storePathS == "") break; auto deriver = readString(conn.from); // deriver - auto references = WorkerProto::Serialise::read(localStore, conn); + auto references = ServeProto::Serialise::read(localStore, conn); readLongLong(conn.from); // download size auto narSize = readLongLong(conn.from); auto narHash = Hash::parseAny(readString(conn.from), htSHA256); diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index ce795700b..dfeaefe73 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -21,7 +21,7 @@ #include "store-api.hh" #include "sync.hh" #include "nar-extractor.hh" -#include "worker-protocol.hh" +#include "serve-protocol.hh" typedef unsigned int BuildID; @@ -310,12 +310,12 @@ struct Machine // Backpointer to the machine ptr machine; - operator nix::WorkerProto::ReadConn () + operator nix::ServeProto::ReadConn () { return { .from = from }; } - operator nix::WorkerProto::WriteConn () + operator nix::ServeProto::WriteConn () { return { .to = to }; } From 104baef503df3afcd09615d9b014923696e9ecf9 Mon Sep 17 00:00:00 2001 From: John Ericson Date: Mon, 4 Dec 2023 09:42:04 -0500 Subject: [PATCH 15/15] Document the connection initialization process --- src/hydra-queue-runner/build-remote.cc | 16 ++++++++++++---- src/hydra-queue-runner/state.hh | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/hydra-queue-runner/build-remote.cc b/src/hydra-queue-runner/build-remote.cc index 032c2733f..baf35d86c 100644 --- a/src/hydra-queue-runner/build-remote.cc +++ b/src/hydra-queue-runner/build-remote.cc @@ -195,6 +195,12 @@ static std::pair openLogFile(const std::string & logDir, cons return {std::move(logFile), std::move(logFD)}; } +/** + * @param conn is not fully initialized; it is this functions job to set + * the `remoteVersion` field after the handshake is completed. + * Therefore, no `ServeProto::Serialize` functions can be used until + * that field is set. + */ static void handshake(Machine::Connection & conn, unsigned int repeats) { conn.to << SERVE_MAGIC_1 << 0x206; @@ -204,6 +210,7 @@ static void handshake(Machine::Connection & conn, unsigned int repeats) if (magic != SERVE_MAGIC_2) throw Error("protocol mismatch with ‘nix-store --serve’ on ‘%1%’", conn.machine->sshName); conn.remoteVersion = readInt(conn.from); + // Now `conn` is initialized. if (GET_PROTOCOL_MAJOR(conn.remoteVersion) != 0x200) throw Error("unsupported ‘nix-store --serve’ protocol version on ‘%1%’", conn.machine->sshName); if (GET_PROTOCOL_MINOR(conn.remoteVersion) < 3 && repeats > 0) @@ -512,10 +519,11 @@ void State::buildRemote(ref destStore, process. Meh. */ }); - Machine::Connection conn; - conn.from = child.from.get(); - conn.to = child.to.get(); - conn.machine = machine; + Machine::Connection conn { + .from = child.from.get(), + .to = child.to.get(), + .machine = machine, + }; Finally updateStats([&]() { bytesReceived += conn.from.read; diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 039239173..6359063af 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -303,8 +303,8 @@ struct Machine // A connection to a machine struct Connection { - nix::FdSink to; nix::FdSource from; + nix::FdSink to; nix::ServeProto::Version remoteVersion; // Backpointer to the machine