From 44ae4054adff55d2c652e5467fa75fa2de0088b4 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Mon, 17 Feb 2025 16:34:33 -0500 Subject: [PATCH 1/6] fix: use a oneshot channel instead of a callback. This changes the `Backend` trait to take a oneshot `Sender` rather than a callback as the caller only cares about when the first task execution has started. This simplifies some integration with `wdl-engine`. --- Cargo.lock | 65 +++++++++---------- crankshaft-config/src/backend.rs | 2 +- crankshaft-docker/src/container.rs | 4 ++ crankshaft-docker/src/images.rs | 5 +- crankshaft-docker/src/lib.rs | 2 +- crankshaft-engine/src/service/runner.rs | 4 +- .../src/service/runner/backend.rs | 9 ++- .../src/service/runner/backend/docker.rs | 17 +++-- .../src/service/runner/backend/generic.rs | 11 +++- .../src/service/runner/backend/tes.rs | 21 +++--- crankshaft-engine/src/task/input.rs | 14 ++++ crankshaft-engine/src/task/resources.rs | 2 +- 12 files changed, 88 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e8f977..912b976 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -277,9 +277,9 @@ checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9" [[package]] name = "cc" -version = "1.2.13" +version = "1.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7777341816418c02e033934a09f20dc0ccaf65a5201ef8a450ae0105a573fda" +checksum = "0c3d1b2e905a3a7b00a6141adb0e4c0bb941d11caf55349d863942a1cc44e3c9" dependencies = [ "shlex", ] @@ -307,9 +307,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.28" +version = "4.5.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e77c3243bd94243c03672cb5154667347c457ca271254724f9f393aee1c05ff" +checksum = "92b7b18d71fad5313a1e320fa9897994228ce274b60faa4d694fe0ea89cd9e6d" dependencies = [ "clap_builder", "clap_derive", @@ -327,9 +327,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.27" +version = "4.5.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b26884eb4b57140e4d2d93652abfa49498b938b3c9179f9fc487b0acc3edad7" +checksum = "a35db2071778a7344791a4fb4f95308b5673d219dee3ae348b86642574ecc90c" dependencies = [ "anstream", "anstyle", @@ -677,9 +677,9 @@ dependencies = [ [[package]] name = "equivalent" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "errno" @@ -1596,9 +1596,9 @@ checksum = "945462a4b81e43c4e3ba96bd7b49d834c6f61198356aa858733bc4acf3cbe62e" [[package]] name = "openssl" -version = "0.10.70" +version = "0.10.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61cfb4e166a8bb8c9b55c500bc2308550148ece889be90f609377e58140f42c6" +checksum = "5e14130c6a98cd258fdcb0fb6d744152343ff729cbfcb28c656a9d12b999fbcd" dependencies = [ "bitflags 2.8.0", "cfg-if", @@ -1628,9 +1628,9 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "openssl-sys" -version = "0.9.105" +version = "0.9.106" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b22d5b84be05a8d6947c7cb71f7c849aa0f112acd4bf51c2a7c1c988ac0a9dc" +checksum = "8bb61ea9811cc39e3c2069f40b8b8e2e70d8569b361f879786cc7ed48b777cdd" dependencies = [ "cc", "libc", @@ -1856,8 +1856,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" dependencies = [ "rand_chacha 0.9.0", - "rand_core 0.9.0", - "zerocopy 0.8.17", + "rand_core 0.9.1", + "zerocopy 0.8.18", ] [[package]] @@ -1877,7 +1877,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core 0.9.0", + "rand_core 0.9.1", ] [[package]] @@ -1891,12 +1891,12 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b08f3c9802962f7e1b25113931d94f43ed9725bebc59db9d0c3e9a23b67e15ff" +checksum = "a88e0da7a2c97baa202165137c158d0a2e824ac465d13d81046727b34cb247d3" dependencies = [ "getrandom 0.3.1", - "zerocopy 0.8.17", + "zerocopy 0.8.18", ] [[package]] @@ -2063,15 +2063,14 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.8" +version = "0.17.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +checksum = "e75ec5e92c4d8aede845126adc388046234541629e76029599ed35a003c7ed24" dependencies = [ "cc", "cfg-if", "getrandom 0.2.15", "libc", - "spin", "untrusted", "windows-sys 0.52.0", ] @@ -2356,9 +2355,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.13.2" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" [[package]] name = "socket2" @@ -2370,12 +2369,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "ssh2" version = "0.9.5" @@ -2471,9 +2464,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.16.0" +version = "3.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38c246215d7d24f48ae091a2902398798e05d978b24315d6efbc00ede9a8bb91" +checksum = "22e5a0acb1f3f55f65cc4a866c361b2fb2a0ff6366785ae6fbb5f85df07ba230" dependencies = [ "cfg-if", "fastrand", @@ -3335,11 +3328,11 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.17" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa91407dacce3a68c56de03abe2760159582b846c6a4acd2f456618087f12713" +checksum = "79386d31a42a4996e3336b0919ddb90f81112af416270cff95b5f5af22b839c2" dependencies = [ - "zerocopy-derive 0.8.17", + "zerocopy-derive 0.8.18", ] [[package]] @@ -3355,9 +3348,9 @@ dependencies = [ [[package]] name = "zerocopy-derive" -version = "0.8.17" +version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06718a168365cad3d5ff0bb133aad346959a2074bd4a85c121255a11304a8626" +checksum = "76331675d372f91bf8d17e13afbd5fe639200b73d01f0fc748bb059f9cca2db7" dependencies = [ "proc-macro2", "quote", diff --git a/crankshaft-config/src/backend.rs b/crankshaft-config/src/backend.rs index 8561713..f380d73 100644 --- a/crankshaft-config/src/backend.rs +++ b/crankshaft-config/src/backend.rs @@ -77,7 +77,7 @@ mod tests { .build(); assert_eq!(config.name(), "generic"); - assert!(matches!(config.kind().as_generic(), Some(_))); + assert!(config.kind().as_generic().is_some()); assert_eq!(config.max_tasks(), 10); let defaults = config.defaults.unwrap(); diff --git a/crankshaft-docker/src/container.rs b/crankshaft-docker/src/container.rs index 95da9e6..ad14810 100644 --- a/crankshaft-docker/src/container.rs +++ b/crankshaft-docker/src/container.rs @@ -112,6 +112,8 @@ impl Container { .map_err(Error::Docker)? .output; + debug!("starting container `{name}`", name = self.name); + // Start the container. self.client .start_container(&self.name, None::>) @@ -148,6 +150,8 @@ impl Container { .map_err(Error::Docker)?; // Wait for the container to be completed. + + debug!("waiting for container `{name}` to exit", name = self.name); let mut wait_stream = self .client .wait_container(&self.name, None::>); diff --git a/crankshaft-docker/src/images.rs b/crankshaft-docker/src/images.rs index 44543e7..1db1905 100644 --- a/crankshaft-docker/src/images.rs +++ b/crankshaft-docker/src/images.rs @@ -64,12 +64,11 @@ pub(crate) async fn list_images(docker: &Docker) -> Result> { /// * Pulling the image from the remote repository. pub(crate) async fn ensure_image(docker: &Docker, image: impl AsRef) -> Result<()> { let image = image.as_ref(); - debug!("ensuring image: `{image}`"); + + debug!("ensuring image `{image}` exists locally"); let mut filters = HashMap::new(); filters.insert("reference", vec![image]); - - debug!("checking if image exists locally: `{image}`"); let results = docker .inner() .list_images(Some(ListImagesOptions { diff --git a/crankshaft-docker/src/lib.rs b/crankshaft-docker/src/lib.rs index 401a64b..f76f7b0 100644 --- a/crankshaft-docker/src/lib.rs +++ b/crankshaft-docker/src/lib.rs @@ -18,7 +18,7 @@ use crate::images::*; #[derive(Error, Debug)] pub enum Error { /// An error from [`bollard`]. - #[error("docker error: {0}")] + #[error(transparent)] Docker(#[from] bollard::errors::Error), /// A required value was missing for a builder field. #[error("missing required builder field `{0}`")] diff --git a/crankshaft-engine/src/service/runner.rs b/crankshaft-engine/src/service/runner.rs index 0e3f10e..ca65b39 100644 --- a/crankshaft-engine/src/service/runner.rs +++ b/crankshaft-engine/src/service/runner.rs @@ -8,6 +8,7 @@ use crankshaft_config::backend::Defaults; use crankshaft_config::backend::Kind; use nonempty::NonEmpty; use tokio::sync::Semaphore; +use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; use tokio_util::sync::CancellationToken; use tracing::trace; @@ -107,7 +108,8 @@ impl Runner { tokio::spawn(async move { let _permit = lock.acquire().await?; - let result = backend.clone().run(task, Arc::new(|_| {}), token)?.await; + let (started_tx, _) = oneshot::channel(); + let result = backend.clone().run(task, started_tx, token)?.await; // NOTE: if the send does not succeed, that is almost certainly // because the receiver was dropped. That is a relatively standard diff --git a/crankshaft-engine/src/service/runner/backend.rs b/crankshaft-engine/src/service/runner/backend.rs index 6abe1e4..731d13e 100644 --- a/crankshaft-engine/src/service/runner/backend.rs +++ b/crankshaft-engine/src/service/runner/backend.rs @@ -2,12 +2,12 @@ use std::fmt::Debug; use std::process::Output; -use std::sync::Arc; use async_trait::async_trait; use eyre::Result; use futures::future::BoxFuture; use nonempty::NonEmpty; +use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use crate::Task; @@ -24,16 +24,15 @@ pub trait Backend: Debug + Send + Sync + 'static { /// Runs a task in a backend. /// - /// The `started` callback is called when an execution of the task has - /// started; the parameter is the index of the execution into the task's - /// executions collection. + /// The `started` channel is notified when the first execution of the task + /// has started. // TODO: use a representation of task output that isn't based on // `std::process::Output` that would allow us to write stdout/stderror to a file // instead of buffering it all in memory fn run( &self, task: Task, - started: Arc, + started: oneshot::Sender<()>, token: CancellationToken, ) -> Result>>>; } diff --git a/crankshaft-engine/src/service/runner/backend/docker.rs b/crankshaft-engine/src/service/runner/backend/docker.rs index a9da4c6..8f23b14 100644 --- a/crankshaft-engine/src/service/runner/backend/docker.rs +++ b/crankshaft-engine/src/service/runner/backend/docker.rs @@ -26,6 +26,7 @@ use futures::future::BoxFuture; use nonempty::NonEmpty; use tempfile::TempDir; use tokio::select; +use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::debug; use tracing::info; @@ -320,7 +321,7 @@ impl crate::Backend for Backend { fn run( &self, task: Task, - started: Arc, + started: oneshot::Sender<()>, token: CancellationToken, ) -> Result>>> { // Helper for cleanup @@ -358,6 +359,7 @@ impl crate::Backend for Backend { let client = self.client.clone(); let cleanup = self.config.cleanup(); let resources = self.resources; + let mut started = Some(started); Ok(async move { let tempdir = TempDir::new().context("failed to create temporary directory for mounts")?; @@ -372,7 +374,9 @@ impl crate::Backend for Backend { .context("task requires a name to run on the Docker backend")? .to_owned(); - for (execution_index, execution) in task.executions().enumerate() { + debug!("spawning task `{name}` with Docker backend"); + + for execution in task.executions() { if token.is_cancelled() { bail!("task has been cancelled"); } @@ -398,12 +402,13 @@ impl crate::Backend for Backend { } let service = Arc::new(builder.try_build(&name).await?); + let started = started.take(); select! { _ = token.cancelled() => { (Err(eyre!("task has been cancelled")), Cleaner::Service(service)) } - res = service.run(|| started(execution_index)) => { + res = service.run(|| if let Some(started) = started { started.send(()).ok(); }) => { (res.wrap_err("failed to run Docker service"), Cleaner::Service(service)) } } @@ -430,11 +435,13 @@ impl crate::Backend for Backend { .await?, ); + let started = started.take(); + select! { _ = token.cancelled() => { (Err(eyre!("task has been cancelled")), Cleaner::Container(container)) } - res = container.run(|| started(execution_index)) => { + res = container.run(|| if let Some(started) = started { started.send(()).ok(); }) => { (res.wrap_err("failed to run Docker container"), Cleaner::Container(container)) } } @@ -508,7 +515,7 @@ async fn add_input_mounts( target: Some(input.path().to_string()), source: Some(source), typ: Some(MountTypeEnum::BIND), - read_only: Some(true), + read_only: Some(input.read_only()), ..Default::default() }); } diff --git a/crankshaft-engine/src/service/runner/backend/generic.rs b/crankshaft-engine/src/service/runner/backend/generic.rs index 9e0c76b..a800f47 100644 --- a/crankshaft-engine/src/service/runner/backend/generic.rs +++ b/crankshaft-engine/src/service/runner/backend/generic.rs @@ -17,6 +17,7 @@ use futures::future::BoxFuture; use nonempty::NonEmpty; use regex::Regex; use tokio::select; +use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::warn; @@ -101,7 +102,7 @@ impl crate::Backend for Backend { fn run( &self, task: Task, - started: Arc, + started: oneshot::Sender<()>, token: CancellationToken, ) -> Result>>> { let driver = self.driver.clone(); @@ -112,6 +113,8 @@ impl crate::Backend for Backend { .map(|resources| resources.to_hashmap()) .unwrap_or_default(); + let mut started = Some(started); + Ok(async move { let mut outputs = Vec::new(); let job_id_regex = config @@ -122,7 +125,7 @@ impl crate::Backend for Backend { }) .transpose()?; - for (execution_index, execution) in task.executions().enumerate() { + for execution in task.executions() { if token.is_cancelled() { bail!("task has been cancelled"); } @@ -168,7 +171,9 @@ impl crate::Backend for Backend { .context("failed to run submit command")?; // Notify that execution has started - started(execution_index); + if let Some(started) = started.take() { + started.send(()).ok(); + } // Monitoring the output. match job_id_regex { diff --git a/crankshaft-engine/src/service/runner/backend/tes.rs b/crankshaft-engine/src/service/runner/backend/tes.rs index 5799a0f..c1c033b 100644 --- a/crankshaft-engine/src/service/runner/backend/tes.rs +++ b/crankshaft-engine/src/service/runner/backend/tes.rs @@ -25,6 +25,7 @@ use tes::v1::Client; use tes::v1::client::tasks::View; use tes::v1::types::task::State; use tokio::select; +use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::debug; use tracing::trace; @@ -75,9 +76,9 @@ impl Backend { async fn wait_task( client: &Client, task_id: &str, - started: Arc, + started: oneshot::Sender<()>, ) -> Result> { - let mut notified = false; + let mut started = Some(started); loop { let task = client @@ -102,11 +103,8 @@ impl Backend { debug!("task is running; waiting before polling again"); // Task is running (or was previously running but now paused), so notify - if !notified { - notified = true; - for i in 0..task.executors.len() { - started(i); - } + if let Some(started) = started.take() { + started.send(()).ok(); } } State::Complete | State::ExecutorError | State::SystemError => { @@ -117,10 +115,9 @@ impl Backend { debug!("task has failed"); } - if !notified { - for i in 0..task.executors.len() { - started(i); - } + // Task has completed, so notify that it started if we haven't already + if let Some(started) = started.take() { + started.send(()).ok(); } let mut results = task @@ -177,7 +174,7 @@ impl crate::Backend for Backend { fn run( &self, task: Task, - started: Arc, + started: oneshot::Sender<()>, token: CancellationToken, ) -> Result>>> { let client = self.client.clone(); diff --git a/crankshaft-engine/src/task/input.rs b/crankshaft-engine/src/task/input.rs index 642dbf8..7cc7b98 100644 --- a/crankshaft-engine/src/task/input.rs +++ b/crankshaft-engine/src/task/input.rs @@ -48,6 +48,12 @@ pub struct Input { /// The type of the input. #[builder(into)] ty: Type, + + /// Whether or not the input should be treated as read-only. + /// + /// Defaults to `true`. + #[builder(default = true)] + read_only: bool, } impl Input { @@ -76,6 +82,13 @@ impl Input { &self.ty } + /// Gets whether or not the input is read-only. + /// + /// Inputs are read-only by default. + pub fn read_only(&self) -> bool { + self.read_only + } + /// Fetches the input contents. /// /// This method will return an error if the input is a path to a directory. @@ -123,6 +136,7 @@ impl TryFrom for tes::v1::types::task::Input { contents, path, ty, + read_only: _, } = input; let (url, content) = contents.one_hot()?; diff --git a/crankshaft-engine/src/task/resources.rs b/crankshaft-engine/src/task/resources.rs index a5856fd..b9b9244 100644 --- a/crankshaft-engine/src/task/resources.rs +++ b/crankshaft-engine/src/task/resources.rs @@ -28,7 +28,7 @@ pub struct Resources { disk: Option, /// The associated compute zones. - #[builder(into)] + #[builder(into, default)] zones: Vec, } From 649361a8f8dbb66f75ce4bb554700f2b57472c15 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Wed, 19 Feb 2025 12:49:29 -0500 Subject: [PATCH 2/6] feat: implement CPU and memory resource limits for the Docker backend. Adds CPU and memory limits representation to `Resources` and `Defaults`. In the Docker backend, these are now passed through to service creation for Swarm support. This commit also fixes the `Into` implementation for `Resources` so that it sets the proper fields in the request. --- crankshaft-config/src/backend/defaults.rs | 21 ++++ .../src/service/runner/backend/docker.rs | 3 +- crankshaft-engine/src/task/resources.rs | 110 +++++++++++++++--- 3 files changed, 115 insertions(+), 19 deletions(-) diff --git a/crankshaft-config/src/backend/defaults.rs b/crankshaft-config/src/backend/defaults.rs index 3d88558..33904ae 100644 --- a/crankshaft-config/src/backend/defaults.rs +++ b/crankshaft-config/src/backend/defaults.rs @@ -16,6 +16,11 @@ pub struct Defaults { /// the backend. cpu: Option, + /// The default limit of CPU cores that a container can use. + /// + /// Not all backends support limits on CPU usage. + cpu_limit: Option, + /// The amount of RAM (in GiB) to use during execution. /// /// This is a float because RAM can be allocated more granularly than in @@ -23,6 +28,12 @@ pub struct Defaults { /// required for a particular environment. ram: Option, + /// The default limit of random access memory that a container can use (in + /// GiB). + /// + /// Not all backends support limits on memory usage. + ram_limit: Option, + /// The amount of disk (in GiB) to use during execution. /// /// This is a float because disks can be allocated more granularly than in @@ -37,11 +48,21 @@ impl Defaults { self.cpu } + /// Gets the CPU limit. + pub fn cpu_limit(&self) -> Option { + self.cpu_limit + } + /// Gets the amount of RAM (in GB). pub fn ram(&self) -> Option { self.ram } + /// Gets the RAM limit (in GB). + pub fn ram_limit(&self) -> Option { + self.ram_limit + } + /// Gets the amount of disk space (in GB). pub fn disk(&self) -> Option { self.disk diff --git a/crankshaft-engine/src/service/runner/backend/docker.rs b/crankshaft-engine/src/service/runner/backend/docker.rs index 8f23b14..5af3f08 100644 --- a/crankshaft-engine/src/service/runner/backend/docker.rs +++ b/crankshaft-engine/src/service/runner/backend/docker.rs @@ -394,6 +394,7 @@ impl crate::Backend for Backend { .image(execution.image()) .program(execution.program()) .args(execution.args()) + .resources(task.resources().map(Into::into).unwrap_or_default()) .attach_stdout() .attach_stderr(); @@ -422,7 +423,7 @@ impl crate::Backend for Backend { .attach_stderr() .host_config(HostConfig { mounts: Some(mounts.clone()), - ..task.resources().map(HostConfig::from).unwrap_or_default() + ..task.resources().map(Into::into).unwrap_or_default() }); if let Some(work_dir) = execution.work_dir() { diff --git a/crankshaft-engine/src/task/resources.rs b/crankshaft-engine/src/task/resources.rs index b9b9244..17ff822 100644 --- a/crankshaft-engine/src/task/resources.rs +++ b/crankshaft-engine/src/task/resources.rs @@ -4,6 +4,7 @@ use std::borrow::Cow; use std::collections::HashMap; use bollard::secret::HostConfig; +use bollard::secret::TaskSpecResources; use bon::Builder; use crankshaft_config::backend::Defaults; @@ -11,22 +12,32 @@ use crankshaft_config::backend::Defaults; #[derive(Builder, Clone, Debug)] #[builder(builder_type = Builder)] pub struct Resources { - /// The number of CPU cores requested. + /// The requested number of CPU cores. /// /// Partial CPU requests are supported but not always respected depending on /// the backend. cpu: Option, - /// Whether or not the task may use preemptible resources. - #[builder(into)] - preemptible: Option, + /// The requested CPU limit. + /// + /// Not all backends support limits on CPU usage. + cpu_limit: Option, /// The requested random access memory size (in GiB). ram: Option, + /// The requested RAM limit (in GiB). + /// + /// Not all backends support limits on memory usage. + ram_limit: Option, + /// The requested disk size (in GiB). disk: Option, + /// Whether or not the task may use preemptible resources. + #[builder(into)] + preemptible: Option, + /// The associated compute zones. #[builder(into, default)] zones: Vec, @@ -38,9 +49,9 @@ impl Resources { self.cpu } - /// Whether the instance should be preemptible. - pub fn preemptible(&self) -> Option { - self.preemptible + /// The CPU limit. + pub fn cpu_limit(&self) -> Option { + self.cpu_limit } /// The amount of RAM in gigabytes. @@ -48,11 +59,21 @@ impl Resources { self.ram } + /// The RAM limit. + pub fn ram_limit(&self) -> Option { + self.ram_limit + } + /// The amount of disk space in gigabytes. pub fn disk(&self) -> Option { self.disk } + /// Whether the instance should be preemptible. + pub fn preemptible(&self) -> Option { + self.preemptible + } + /// The set of requested zones. pub fn zones(&self) -> &[String] { &self.zones @@ -64,18 +85,26 @@ impl Resources { self.cpu = Some(cores); } - if let Some(preemptible) = other.preemptible { - self.preemptible = Some(preemptible); + if let Some(limit) = other.cpu_limit { + self.cpu_limit = Some(limit); } if let Some(ram) = other.ram { self.ram = Some(ram); } + if let Some(limit) = other.ram_limit { + self.ram_limit = Some(limit); + } + if let Some(disk) = other.disk { self.disk = Some(disk); } + if let Some(preemptible) = other.preemptible { + self.preemptible = Some(preemptible); + } + self.zones = other.zones.clone(); self } @@ -97,8 +126,8 @@ impl Resources { map.insert("cpu".into(), cores.to_string().into()); } - if let Some(preemptible) = self.preemptible { - map.insert("preemptible".into(), preemptible.to_string().into()); + if let Some(limit) = self.cpu_limit { + map.insert("cpu_limit".into(), limit.to_string().into()); } if let Some(ram) = self.ram { @@ -107,12 +136,20 @@ impl Resources { map.insert("ram_mb".into(), (ram * 1024.0).to_string().into()); } + if let Some(limit) = self.ram_limit { + map.insert("ram_limit".into(), limit.to_string().into()); + } + if let Some(disk) = self.disk { map.insert("disk".into(), disk.to_string().into()); // TODO(clay): improve this. map.insert("disk_mb".into(), (disk * 1024.0).to_string().into()); } + if let Some(preemptible) = self.preemptible { + map.insert("preemptible".into(), preemptible.to_string().into()); + } + // Zones are explicitly not included. map } @@ -122,9 +159,11 @@ impl Default for Resources { fn default() -> Self { Self { cpu: Some(1.0), - preemptible: Some(false), + cpu_limit: None, ram: Some(2.0), + ram_limit: None, disk: Some(8.0), + preemptible: Some(false), zones: Default::default(), } } @@ -134,9 +173,11 @@ impl From<&Defaults> for Resources { fn from(defaults: &Defaults) -> Self { Self { cpu: defaults.cpu(), - preemptible: Default::default(), + cpu_limit: defaults.cpu(), ram: defaults.ram(), + ram_limit: defaults.ram_limit(), disk: defaults.disk(), + preemptible: Default::default(), zones: Default::default(), } } @@ -144,13 +185,19 @@ impl From<&Defaults> for Resources { impl From<&Resources> for HostConfig { fn from(resources: &Resources) -> Self { - let mut host_config = HostConfig::default(); + let mut host_config = Self::default(); + + // Note: Docker doesn't have a CPU reservation for containers + if let Some(cpu) = resources.cpu_limit() { + host_config.nano_cpus = Some((cpu * 1_000_000_000.0) as i64); + } + if let Some(ram) = resources.ram() { - host_config.memory = Some((ram * 1024. * 1024. * 1024.) as i64); + host_config.memory_reservation = Some((ram * 1024. * 1024. * 1024.) as i64); } - if let Some(cpu) = resources.cpu() { - host_config.cpu_count = Some(cpu as i64); + if let Some(ram) = resources.ram_limit() { + host_config.memory = Some((ram * 1024. * 1024. * 1024.) as i64); } if let Some(disk) = resources.disk() { @@ -163,6 +210,33 @@ impl From<&Resources> for HostConfig { } } +impl From<&Resources> for TaskSpecResources { + fn from(resources: &Resources) -> Self { + let mut spec = Self::default(); + + if let Some(cpu) = resources.cpu() { + spec.reservations.get_or_insert_default().nano_cpus = + Some((cpu * 1_000_000_000.0) as i64); + } + + if let Some(cpu) = resources.cpu_limit() { + spec.limits.get_or_insert_default().nano_cpus = Some((cpu * 1_000_000_000.0) as i64); + } + + if let Some(ram) = resources.ram() { + spec.reservations.get_or_insert_default().memory_bytes = + Some((ram * 1024. * 1024. * 1024.) as i64); + } + + if let Some(ram) = resources.ram_limit() { + spec.limits.get_or_insert_default().memory_bytes = + Some((ram * 1024. * 1024. * 1024.) as i64); + } + + spec + } +} + impl From for tes::v1::types::task::Resources { fn from(resources: Resources) -> Self { if !resources.zones.is_empty() { @@ -171,9 +245,9 @@ impl From for tes::v1::types::task::Resources { tes::v1::types::task::Resources { cpu_cores: resources.cpu().map(|inner| inner as i64), - preemptible: resources.preemptible(), ram_gb: resources.ram(), disk_gb: resources.disk(), + preemptible: resources.preemptible(), zones: None, } } From 962a23dc7bfc89cc1d083c718856377626152eb2 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Thu, 20 Feb 2025 01:02:23 -0500 Subject: [PATCH 3/6] fix: fixes to the Docker backend. Fixed the following: * non-zero exit codes were showing up as signals due to not properly formatting a wait exit status. * the default entry point should be specified when creating the container to override any entry point specified in the image, as the full command is being provided for a task. * `bollard` turns non-zero exit codes from an exited container into a wait error; we need to handle that error and treat it as a successful wait with a non-zero exit code. --- crankshaft-docker/src/container.rs | 23 +++++++++-------- crankshaft-docker/src/container/builder.rs | 3 +++ crankshaft-docker/src/service.rs | 25 +++++++++++-------- .../service/runner/backend/generic/driver.rs | 3 ++- .../src/service/runner/backend/tes.rs | 3 ++- 5 files changed, 33 insertions(+), 24 deletions(-) diff --git a/crankshaft-docker/src/container.rs b/crankshaft-docker/src/container.rs index ad14810..2288ba7 100644 --- a/crankshaft-docker/src/container.rs +++ b/crankshaft-docker/src/container.rs @@ -16,6 +16,7 @@ use bollard::container::RemoveContainerOptions; use bollard::container::StartContainerOptions; use bollard::container::UploadToContainerOptions; use bollard::container::WaitContainerOptions; +use bollard::secret::ContainerWaitResponse; use futures::TryStreamExt as _; use tokio_stream::StreamExt as _; use tracing::debug; @@ -158,17 +159,16 @@ impl Container { let mut exit_code = None; if let Some(result) = wait_stream.next().await { - let response = result.map_err(Error::Docker)?; - if let Some(error) = response.error { - return Err(Error::Message(format!( - "failed to wait for Docker task to complete: {error}", - error = error - .message - .expect("Docker reported an error without a message") - ))); + match result { + // Bollard turns non-zero exit codes into wait errors, so check for both + Ok(ContainerWaitResponse { + status_code: code, .. + }) + | Err(bollard::errors::Error::DockerContainerWaitError { code, .. }) => { + exit_code = Some(code); + } + Err(e) => return Err(e.into()), } - - exit_code = Some(response.status_code); } if exit_code.is_none() { @@ -190,7 +190,8 @@ impl Container { #[cfg(unix)] let output = Output { - status: ExitStatus::from_raw(exit_code.unwrap() as i32), + // See WEXITSTATUS from wait(2) to explain the shift + status: ExitStatus::from_raw((exit_code.unwrap() as i32) << 8), stdout, stderr, }; diff --git a/crankshaft-docker/src/container/builder.rs b/crankshaft-docker/src/container/builder.rs index 48713c6..4ff961f 100644 --- a/crankshaft-docker/src/container/builder.rs +++ b/crankshaft-docker/src/container/builder.rs @@ -150,6 +150,9 @@ impl Builder { // one way or the other and not rely on the default. cmd: Some(cmd), image: Some(image), + // Override the entrypoint to the default Docker entrypoint as we're providing + // the full command + entrypoint: Some(vec![String::new()]), attach_stdout: Some(self.attach_stdout), attach_stderr: Some(self.attach_stderr), // END NOTE diff --git a/crankshaft-docker/src/service.rs b/crankshaft-docker/src/service.rs index dcc0534..a4d755e 100644 --- a/crankshaft-docker/src/service.rs +++ b/crankshaft-docker/src/service.rs @@ -13,6 +13,7 @@ use bollard::Docker; use bollard::container::LogOutput; use bollard::container::LogsOptions; use bollard::container::WaitContainerOptions; +use bollard::secret::ContainerWaitResponse; use bollard::secret::TaskState; use bollard::task::ListTasksOptions; @@ -139,17 +140,18 @@ impl Service { let mut exit_code = None; if let Some(result) = wait_stream.next().await { - let response = result.map_err(Error::Docker)?; - if let Some(error) = response.error { - return Err(Error::Message(format!( - "failed to wait for Docker task to complete: {error}", - error = error - .message - .expect("Docker reported an error without a message") - ))); + match result { + // Bollard turns non-zero exit codes into wait errors, so check for both + Ok(ContainerWaitResponse { + status_code: code, .. + }) + | Err(bollard::errors::Error::DockerContainerWaitError { + code, .. + }) => { + exit_code = Some(code); + } + Err(e) => return Err(e.into()), } - - exit_code = Some(response.status_code); } if exit_code.is_none() { @@ -255,7 +257,8 @@ impl Service { #[cfg(unix)] let output = Output { - status: ExitStatus::from_raw(exit_code as i32), + // See WEXITSTATUS from wait(2) to explain the shift + status: ExitStatus::from_raw((exit_code as i32) << 8), stdout, stderr, }; diff --git a/crankshaft-engine/src/service/runner/backend/generic/driver.rs b/crankshaft-engine/src/service/runner/backend/generic/driver.rs index f0b8e85..b310d71 100644 --- a/crankshaft-engine/src/service/runner/backend/generic/driver.rs +++ b/crankshaft-engine/src/service/runner/backend/generic/driver.rs @@ -389,7 +389,8 @@ async fn run_ssh_command( #[cfg(unix)] let output = Output { - status: ExitStatus::from_raw(status), + // See WEXITSTATUS from wait(2) to explain the shift + status: ExitStatus::from_raw(status << 8), stdout, stderr, }; diff --git a/crankshaft-engine/src/service/runner/backend/tes.rs b/crankshaft-engine/src/service/runner/backend/tes.rs index c1c033b..7c99ec4 100644 --- a/crankshaft-engine/src/service/runner/backend/tes.rs +++ b/crankshaft-engine/src/service/runner/backend/tes.rs @@ -130,7 +130,8 @@ impl Backend { #[cfg(unix)] let output = Output { - status: ExitStatus::from_raw(status as i32), + // See WEXITSTATUS from wait(2) to explain the shift + status: ExitStatus::from_raw((status as i32) << 8), stdout: log.stdout.unwrap_or_default().as_bytes().to_vec(), stderr: log.stderr.unwrap_or_default().as_bytes().to_vec(), }; From c01fa1c465c8b63e1dc1dcd30ebbc74a31e71444 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Thu, 20 Feb 2025 11:57:21 -0500 Subject: [PATCH 4/6] fix: correct the use of `memory_reservation` in container HostConfig. The Docker `memory_reservation` setting acts as a memory soft limit used in OOM conditions. Crankshaft was treating it like a minimum requirement for memory, which only makes sense when Docker is operating in a swarm. The fix is to remove setting the option. --- crankshaft-engine/src/task/resources.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crankshaft-engine/src/task/resources.rs b/crankshaft-engine/src/task/resources.rs index 17ff822..de68b2d 100644 --- a/crankshaft-engine/src/task/resources.rs +++ b/crankshaft-engine/src/task/resources.rs @@ -59,7 +59,7 @@ impl Resources { self.ram } - /// The RAM limit. + /// The RAM limit in gigabytes. pub fn ram_limit(&self) -> Option { self.ram_limit } @@ -192,9 +192,9 @@ impl From<&Resources> for HostConfig { host_config.nano_cpus = Some((cpu * 1_000_000_000.0) as i64); } - if let Some(ram) = resources.ram() { - host_config.memory_reservation = Some((ram * 1024. * 1024. * 1024.) as i64); - } + // Note: Docker doesn't have a memory reservation for containers + // The Docker `memory_reservation` setting acts as a soft limit and not as + // something informing a scheduler of minimum requirements for the container if let Some(ram) = resources.ram_limit() { host_config.memory = Some((ram * 1024. * 1024. * 1024.) as i64); From 3bd7fea5a62491b59708dd4f735b0e5e7bb7d981 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Thu, 20 Feb 2025 16:05:22 -0500 Subject: [PATCH 5/6] chore: code review feedback. --- crankshaft-config/src/backend/defaults.rs | 6 +++--- crankshaft-engine/src/service/runner.rs | 4 +--- crankshaft-engine/src/service/runner/backend.rs | 6 +++--- .../src/service/runner/backend/docker.rs | 9 +++++++-- .../src/service/runner/backend/generic.rs | 7 ++++--- .../src/service/runner/backend/tes.rs | 9 +++++---- crankshaft-engine/src/task/resources.rs | 14 ++++++++++++++ 7 files changed, 37 insertions(+), 18 deletions(-) diff --git a/crankshaft-config/src/backend/defaults.rs b/crankshaft-config/src/backend/defaults.rs index 33904ae..a6279a6 100644 --- a/crankshaft-config/src/backend/defaults.rs +++ b/crankshaft-config/src/backend/defaults.rs @@ -53,17 +53,17 @@ impl Defaults { self.cpu_limit } - /// Gets the amount of RAM (in GB). + /// Gets the amount of RAM (in GiB). pub fn ram(&self) -> Option { self.ram } - /// Gets the RAM limit (in GB). + /// Gets the RAM limit (in GiB). pub fn ram_limit(&self) -> Option { self.ram_limit } - /// Gets the amount of disk space (in GB). + /// Gets the amount of disk space (in GiB). pub fn disk(&self) -> Option { self.disk } diff --git a/crankshaft-engine/src/service/runner.rs b/crankshaft-engine/src/service/runner.rs index ca65b39..17cce1e 100644 --- a/crankshaft-engine/src/service/runner.rs +++ b/crankshaft-engine/src/service/runner.rs @@ -8,7 +8,6 @@ use crankshaft_config::backend::Defaults; use crankshaft_config::backend::Kind; use nonempty::NonEmpty; use tokio::sync::Semaphore; -use tokio::sync::oneshot; use tokio::sync::oneshot::Receiver; use tokio_util::sync::CancellationToken; use tracing::trace; @@ -108,8 +107,7 @@ impl Runner { tokio::spawn(async move { let _permit = lock.acquire().await?; - let (started_tx, _) = oneshot::channel(); - let result = backend.clone().run(task, started_tx, token)?.await; + let result = backend.clone().run(task, None, token)?.await; // NOTE: if the send does not succeed, that is almost certainly // because the receiver was dropped. That is a relatively standard diff --git a/crankshaft-engine/src/service/runner/backend.rs b/crankshaft-engine/src/service/runner/backend.rs index 731d13e..7f3b5e4 100644 --- a/crankshaft-engine/src/service/runner/backend.rs +++ b/crankshaft-engine/src/service/runner/backend.rs @@ -24,15 +24,15 @@ pub trait Backend: Debug + Send + Sync + 'static { /// Runs a task in a backend. /// - /// The `started` channel is notified when the first execution of the task - /// has started. + /// The optional `started` channel is notified when the first execution of + /// the task has started. // TODO: use a representation of task output that isn't based on // `std::process::Output` that would allow us to write stdout/stderror to a file // instead of buffering it all in memory fn run( &self, task: Task, - started: oneshot::Sender<()>, + started: Option>, token: CancellationToken, ) -> Result>>>; } diff --git a/crankshaft-engine/src/service/runner/backend/docker.rs b/crankshaft-engine/src/service/runner/backend/docker.rs index 5af3f08..bddb9fc 100644 --- a/crankshaft-engine/src/service/runner/backend/docker.rs +++ b/crankshaft-engine/src/service/runner/backend/docker.rs @@ -321,7 +321,7 @@ impl crate::Backend for Backend { fn run( &self, task: Task, - started: oneshot::Sender<()>, + mut started: Option>, token: CancellationToken, ) -> Result>>> { // Helper for cleanup @@ -359,7 +359,6 @@ impl crate::Backend for Backend { let client = self.client.clone(); let cleanup = self.config.cleanup(); let resources = self.resources; - let mut started = Some(started); Ok(async move { let tempdir = TempDir::new().context("failed to create temporary directory for mounts")?; @@ -406,6 +405,9 @@ impl crate::Backend for Backend { let started = started.take(); select! { + // Always poll the cancellation token first + biased; + _ = token.cancelled() => { (Err(eyre!("task has been cancelled")), Cleaner::Service(service)) } @@ -439,6 +441,9 @@ impl crate::Backend for Backend { let started = started.take(); select! { + // Always poll the cancellation token first + biased; + _ = token.cancelled() => { (Err(eyre!("task has been cancelled")), Cleaner::Container(container)) } diff --git a/crankshaft-engine/src/service/runner/backend/generic.rs b/crankshaft-engine/src/service/runner/backend/generic.rs index a800f47..54b1909 100644 --- a/crankshaft-engine/src/service/runner/backend/generic.rs +++ b/crankshaft-engine/src/service/runner/backend/generic.rs @@ -102,7 +102,7 @@ impl crate::Backend for Backend { fn run( &self, task: Task, - started: oneshot::Sender<()>, + mut started: Option>, token: CancellationToken, ) -> Result>>> { let driver = self.driver.clone(); @@ -113,8 +113,6 @@ impl crate::Backend for Backend { .map(|resources| resources.to_hashmap()) .unwrap_or_default(); - let mut started = Some(started); - Ok(async move { let mut outputs = Vec::new(); let job_id_regex = config @@ -197,6 +195,9 @@ impl crate::Backend for Backend { .context("failed to resolve monitor command")?; let result = select! { + // Always poll the cancellation token first + biased; + _ = token.cancelled() => { Err(eyre!("task has been cancelled")) } diff --git a/crankshaft-engine/src/service/runner/backend/tes.rs b/crankshaft-engine/src/service/runner/backend/tes.rs index 7c99ec4..600166c 100644 --- a/crankshaft-engine/src/service/runner/backend/tes.rs +++ b/crankshaft-engine/src/service/runner/backend/tes.rs @@ -76,10 +76,8 @@ impl Backend { async fn wait_task( client: &Client, task_id: &str, - started: oneshot::Sender<()>, + mut started: Option>, ) -> Result> { - let mut started = Some(started); - loop { let task = client .get_task(task_id, View::Full) @@ -175,7 +173,7 @@ impl crate::Backend for Backend { fn run( &self, task: Task, - started: oneshot::Sender<()>, + started: Option>, token: CancellationToken, ) -> Result>>> { let client = self.client.clone(); @@ -185,6 +183,9 @@ impl crate::Backend for Backend { let task_id = client.create_task(task).await?.id; select! { + // Always poll the cancellation token first + biased; + _ = token.cancelled() => { // Cancel the task client.cancel_task(&task_id).await?; diff --git a/crankshaft-engine/src/task/resources.rs b/crankshaft-engine/src/task/resources.rs index de68b2d..a49ed68 100644 --- a/crankshaft-engine/src/task/resources.rs +++ b/crankshaft-engine/src/task/resources.rs @@ -7,6 +7,7 @@ use bollard::secret::HostConfig; use bollard::secret::TaskSpecResources; use bon::Builder; use crankshaft_config::backend::Defaults; +use tracing::info; /// A set of requested resources. #[derive(Builder, Clone, Debug)] @@ -188,11 +189,24 @@ impl From<&Resources> for HostConfig { let mut host_config = Self::default(); // Note: Docker doesn't have a CPU reservation for containers + if resources.cpu().is_some() { + info!( + "ignoring minimum CPU reservation for a Docker daemon not participating in a swarm" + ); + } + if let Some(cpu) = resources.cpu_limit() { host_config.nano_cpus = Some((cpu * 1_000_000_000.0) as i64); } // Note: Docker doesn't have a memory reservation for containers + if resources.ram().is_some() { + info!( + "ignoring minimum memory reservation for a Docker daemon not participating in a \ + swarm" + ); + } + // The Docker `memory_reservation` setting acts as a soft limit and not as // something informing a scheduler of minimum requirements for the container From ecee420524aef3198f1063943adfa5ec0e0e4d94 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Thu, 20 Feb 2025 16:09:55 -0500 Subject: [PATCH 6/6] chore: update CHANGELOGs. --- crankshaft-config/CHANGELOG.md | 1 + crankshaft-docker/CHANGELOG.md | 5 +++++ crankshaft-engine/CHANGELOG.md | 1 + 3 files changed, 7 insertions(+) diff --git a/crankshaft-config/CHANGELOG.md b/crankshaft-config/CHANGELOG.md index 9c7b2d2..3cbf06c 100644 --- a/crankshaft-config/CHANGELOG.md +++ b/crankshaft-config/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +* Added support for specifying CPU and memory limits to configuration defaults ([#16](https://github.com/stjude-rust-labs/crankshaft/pull/16)). * Adds the initial version of the crate. ### Changed diff --git a/crankshaft-docker/CHANGELOG.md b/crankshaft-docker/CHANGELOG.md index fa9f738..c60b9e3 100644 --- a/crankshaft-docker/CHANGELOG.md +++ b/crankshaft-docker/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +* Added support for respecting CPU and memory limits ([#16](https://github.com/stjude-rust-labs/crankshaft/pull/16)). * Added support for submitting tasks via the service API for Docker Swarm (#[11](https://github.com/stjude-rust-labs/crankshaft/pull/11)). * Adds the initial version of the crate. @@ -20,3 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#8](https://github.com/stjude-rust-labs/crankshaft/pull/8)). * Replaced `attached` with separate stdout and stderr attach flags ([#8](https://github.com/stjude-rust-labs/crankshaft/pull/8)). + +### Fixed + +* Fixed a non-zero exit code from a container being treated as a wait error ([#16](https://github.com/stjude-rust-labs/crankshaft/pull/16)). diff --git a/crankshaft-engine/CHANGELOG.md b/crankshaft-engine/CHANGELOG.md index 7658196..cbfd8f4 100644 --- a/crankshaft-engine/CHANGELOG.md +++ b/crankshaft-engine/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +* Added a notification channel for the first time a task starts executing ([#16](https://github.com/stjude-rust-labs/crankshaft/pull/16)). * Added support for bind mounting inputs to the Docker backend ([#12](https://github.com/stjude-rust-labs/crankshaft/pull/12)). * Added cancellation support to the engine and ctrl-c handling in the examples (#[11](https://github.com/stjude-rust-labs/crankshaft/pull/11)). * Added support for Docker Swarm in the docker backend (#[11](https://github.com/stjude-rust-labs/crankshaft/pull/11)).