From 21cbbf7ca0834edeed4765df1e07a66b234e0917 Mon Sep 17 00:00:00 2001 From: Peter Huene Date: Sat, 15 Feb 2025 11:42:05 -0500 Subject: [PATCH] feat: improve the Docker backend. (#12) * feat: improve the Docker backend. * add support for specifying inputs by host file path. * implement inputs as bind mounts rather than files uploaded to the container. * implement mounts for tasks that use services rather than containers. * make the Docker backend ensure the task execution's image exists locally before creating the container or service. * use `alpine` for the examples as it is smaller than `ubuntu`. * chore: update CHANGELOG.md. --- crankshaft-docker/src/bin/docker-driver.rs | 10 +- crankshaft-docker/src/container/builder.rs | 2 +- crankshaft-docker/src/images.rs | 25 ++- crankshaft-docker/src/lib.rs | 7 +- crankshaft-docker/src/service/builder.rs | 22 ++- crankshaft-engine/CHANGELOG.md | 6 + .../src/service/runner/backend/docker.rs | 173 ++++++++++++------ crankshaft-engine/src/task/input.rs | 49 +++-- crankshaft-engine/src/task/input/contents.rs | 28 ++- examples/src/docker/main.rs | 2 +- examples/src/lsf/main.rs | 2 +- examples/src/tes/main.rs | 2 +- 12 files changed, 222 insertions(+), 106 deletions(-) diff --git a/crankshaft-docker/src/bin/docker-driver.rs b/crankshaft-docker/src/bin/docker-driver.rs index c96e7f8..4b99983 100644 --- a/crankshaft-docker/src/bin/docker-driver.rs +++ b/crankshaft-docker/src/bin/docker-driver.rs @@ -65,11 +65,9 @@ enum Command { /// existing). EnsureImage { /// The name of the image. + /// + /// If a tag is not specified, a default tag of `latest` is used. image: String, - - #[arg(short, long, default_value = "latest")] - /// The tag for the image. - tag: String, }, /// Lists all images. @@ -156,8 +154,8 @@ async fn run(args: Args) -> Result<()> { container.remove().await?; } } - Command::EnsureImage { image, tag } => { - docker.ensure_image(image, tag).await?; + Command::EnsureImage { image } => { + docker.ensure_image(image).await?; } Command::ListImages => { docker.list_images().await?; diff --git a/crankshaft-docker/src/container/builder.rs b/crankshaft-docker/src/container/builder.rs index a0c8966..48713c6 100644 --- a/crankshaft-docker/src/container/builder.rs +++ b/crankshaft-docker/src/container/builder.rs @@ -103,7 +103,7 @@ impl Builder { /// Sets multiple environment variables. pub fn envs( mut self, - variables: impl Iterator, impl Into)>, + variables: impl IntoIterator, impl Into)>, ) -> Self { self.env .extend(variables.into_iter().map(|(k, v)| (k.into(), v.into()))); diff --git a/crankshaft-docker/src/images.rs b/crankshaft-docker/src/images.rs index 25c7f3d..44543e7 100644 --- a/crankshaft-docker/src/images.rs +++ b/crankshaft-docker/src/images.rs @@ -56,23 +56,20 @@ pub(crate) async fn list_images(docker: &Docker) -> Result> { /// Ensures that an image exists in the Docker daemon. /// +/// If the image does not specify a tag, a default tag of `latest` will be used. +/// /// It does this by: /// /// * Confirming that the image already exists there, or /// * Pulling the image from the remote repository. -pub(crate) async fn ensure_image( - docker: &Docker, - name: impl AsRef, - tag: impl AsRef, -) -> Result<()> { - let name = name.as_ref(); - let tag = tag.as_ref(); - debug!("ensuring image: `{name}:{tag}`"); +pub(crate) async fn ensure_image(docker: &Docker, image: impl AsRef) -> Result<()> { + let image = image.as_ref(); + debug!("ensuring image: `{image}`"); let mut filters = HashMap::new(); - filters.insert(String::from("reference"), vec![format!("{name}:{tag}")]); + filters.insert("reference", vec![image]); - debug!("checking if image exists locally: `{name}:{tag}`"); + debug!("checking if image exists locally: `{image}`"); let results = docker .inner() .list_images(Some(ListImagesOptions { @@ -83,7 +80,7 @@ pub(crate) async fn ensure_image( .map_err(Error::Docker)?; if !results.is_empty() { - debug!("image `{name}:{tag}` exists locally"); + debug!("image `{image}` exists locally"); if enabled!(Level::TRACE) { trace!( @@ -95,11 +92,11 @@ pub(crate) async fn ensure_image( return Ok(()); } - debug!("image `{name}:{tag}` does not exist locally; attempting to pull from remote"); + debug!("image `{image}` does not exist locally; attempting to pull from remote"); let mut stream = docker.inner().create_image( Some(CreateImageOptions { - from_image: name, - tag, + from_image: image, + tag: if image.contains(':') { "" } else { "latest" }, ..Default::default() }), None, diff --git a/crankshaft-docker/src/lib.rs b/crankshaft-docker/src/lib.rs index 25835cb..401a64b 100644 --- a/crankshaft-docker/src/lib.rs +++ b/crankshaft-docker/src/lib.rs @@ -75,12 +75,15 @@ impl Docker { /// Ensures that an image exists in the Docker daemon. /// + /// If the image does not specify a tag, a default tag of `latest` will be + /// used. + /// /// It does this by: /// /// * Confirming that the image already exists there, or /// * Pulling the image from the remote repository. - pub async fn ensure_image(&self, name: impl AsRef, tag: impl AsRef) -> Result<()> { - ensure_image(self, name, tag).await + pub async fn ensure_image(&self, image: impl AsRef) -> Result<()> { + ensure_image(self, image).await } /// Removes an image from the Docker daemon. diff --git a/crankshaft-docker/src/service/builder.rs b/crankshaft-docker/src/service/builder.rs index ea4443b..0c6d319 100644 --- a/crankshaft-docker/src/service/builder.rs +++ b/crankshaft-docker/src/service/builder.rs @@ -1,6 +1,7 @@ //! Builders for containers. use bollard::Docker; +use bollard::secret::Mount; use bollard::secret::ServiceSpec; use bollard::secret::ServiceSpecMode; use bollard::secret::ServiceSpecModeReplicated; @@ -16,7 +17,7 @@ use super::Service; use crate::Error; use crate::Result; -/// A builder for a [`Container`]. +/// A builder for a [`Service`]. pub struct Builder { /// A reference to the [`Docker`] client that will be used to create this /// container. @@ -43,6 +44,9 @@ pub struct Builder { /// The working directory. work_dir: Option, + /// The mounts for the service's task template. + mounts: Vec, + /// The task resources for the service. resources: Option, } @@ -59,6 +63,7 @@ impl Builder { attach_stderr: false, env: Default::default(), work_dir: Default::default(), + mounts: Default::default(), resources: Default::default(), } } @@ -96,7 +101,7 @@ impl Builder { /// Sets multiple environment variables. pub fn envs( mut self, - variables: impl Iterator, impl Into)>, + variables: impl IntoIterator, impl Into)>, ) -> Self { self.env .extend(variables.into_iter().map(|(k, v)| (k.into(), v.into()))); @@ -121,6 +126,18 @@ impl Builder { self } + /// Sets a mount for the service. + pub fn mount(mut self, mount: impl Into) -> Self { + self.mounts.push(mount.into()); + self + } + + /// Sets multiple mounts for the service. + pub fn mounts(mut self, mounts: impl IntoIterator>) -> Self { + self.mounts.extend(mounts.into_iter().map(Into::into)); + self + } + /// Sets the task resources. pub fn resources(mut self, resources: TaskSpecResources) -> Self { self.resources = Some(resources); @@ -152,6 +169,7 @@ impl Builder { args: Some(self.args), dir: self.work_dir, env: Some(self.env.iter().map(|(k, v)| format!("{k}={v}")).collect()), + mounts: Some(self.mounts), ..Default::default() }), resources: self.resources, diff --git a/crankshaft-engine/CHANGELOG.md b/crankshaft-engine/CHANGELOG.md index c875500..7658196 100644 --- a/crankshaft-engine/CHANGELOG.md +++ b/crankshaft-engine/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +* Added support for bind mounting inputs to the Docker backend ([#12](https://github.com/stjude-rust-labs/crankshaft/pull/12)). * Added cancellation support to the engine and ctrl-c handling in the examples (#[11](https://github.com/stjude-rust-labs/crankshaft/pull/11)). * Added support for Docker Swarm in the docker backend (#[11](https://github.com/stjude-rust-labs/crankshaft/pull/11)). * Adds the initial version of the crate. @@ -32,3 +33,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Swaps out most of the bespoke builders for `bon`. * Removes `#[builder(into)]` for numerical types ([#10](https://github.com/stjude-rust-labs/crankshaft/pull/10)). + +### Fixed + +* The Docker backend now ensures task execution images are present locally + before creating any containers ([#12](https://github.com/stjude-rust-labs/crankshaft/pull/12)). \ No newline at end of file diff --git a/crankshaft-engine/src/service/runner/backend/docker.rs b/crankshaft-engine/src/service/runner/backend/docker.rs index a7745fd..63f546a 100644 --- a/crankshaft-engine/src/service/runner/backend/docker.rs +++ b/crankshaft-engine/src/service/runner/backend/docker.rs @@ -1,5 +1,8 @@ //! A Docker backend. +use std::io::Write; +use std::path::Path; +use std::path::PathBuf; use std::process::Output; use std::sync::Arc; @@ -29,6 +32,8 @@ use tracing::info; use crate::Result; use crate::Task; +use crate::task::Input; +use crate::task::input::Contents; /// Represents resource information about a Docker swarm. #[derive(Debug, Default, Clone, Copy)] @@ -264,43 +269,6 @@ impl Backend { pub fn resources(&self) -> &Resources { &self.resources } - - /// Runs a task using a container. - async fn run_with_container( - task: &Task, - execution_index: usize, - started: Arc, - container: Arc, - ) -> Result { - // TODO(clay): these could be cached. - for task in task.inputs().map(|i| { - let container = container.clone(); - tokio::spawn(async move { - let contents = i.fetch().await; - container.upload_file(i.path(), &contents).await - }) - }) { - task.await??; - } - - container - .run(|| started(execution_index)) - .await - .wrap_err("failed to run Docker container") - } - - /// Runs a task using a Docker service. - async fn run_with_service( - execution_index: usize, - started: Arc, - service: Arc, - ) -> Result { - // TODO: handle inputs - service - .run(|| started(execution_index)) - .await - .wrap_err("failed to run Docker service") - } } #[async_trait] @@ -350,10 +318,13 @@ impl crate::Backend for Backend { let client = self.client.clone(); let cleanup = self.config.cleanup(); let resources = self.resources; - let mounts = get_shared_mounts(task.shared_volumes())?; Ok(async move { - // Check to see if we should use the service API for running the task + let tempdir = TempDir::new().context("failed to create temporary directory for mounts")?; + + let mut mounts = Vec::new(); + add_input_mounts(task.inputs(), tempdir.path(), &mut mounts).await?; + add_shared_mounts(task.shared_volumes(), tempdir.path(), &mut mounts)?; let mut outputs = Vec::new(); let name = task @@ -366,6 +337,13 @@ impl crate::Backend for Backend { bail!("task has been cancelled"); } + // First ensure the execution's image exists + client + .ensure_image(execution.image()) + .await + .with_context(|| format!("failed to pull image `{image}`", image = execution.image()))?; + + // Check to see if we should use the service API for running the task let (result, cleaner) = if resources.use_service() { let mut builder = client .service_builder() @@ -385,8 +363,8 @@ impl crate::Backend for Backend { _ = token.cancelled() => { (Err(eyre!("task has been cancelled")), Cleaner::Service(service)) } - res = Self::run_with_service(execution_index, started.clone(), service.clone()) => { - (res, Cleaner::Service(service)) + res = service.run(|| started(execution_index)) => { + (res.wrap_err("failed to run Docker service"), Cleaner::Service(service)) } } } else { @@ -416,8 +394,8 @@ impl crate::Backend for Backend { _ = token.cancelled() => { (Err(eyre!("task has been cancelled")), Cleaner::Container(container)) } - res = Self::run_with_container(&task, execution_index, started.clone(), container.clone()) => { - (res, Cleaner::Container(container)) + res = container.run(|| started(execution_index)) => { + (res.wrap_err("failed to run Docker container"), Cleaner::Container(container)) } } }; @@ -437,24 +415,103 @@ impl crate::Backend for Backend { } } +/// Adds input mounts to the list of mounts. +/// +/// Bind mounts are created for any input specified as a path. +/// +/// For inputs not specified by a path, the contents are fetched and written to +/// a file within the provided temporary directory. +/// +/// Errors may be returned if an input's contents could not be fetched. +async fn add_input_mounts( + inputs: impl Iterator>, + tempdir: &Path, + mounts: &mut Vec, +) -> Result<()> { + for input in inputs { + let source = if let Contents::Path(path) = input.contents() { + // Use the input path directly + path.to_str() + .with_context(|| { + format!("input path `{path}` is not UTF-8", path = path.display()) + })? + .to_string() + } else { + // Write the input file contents to a temporary file within the temporary + // directory + let mut file = tempfile::NamedTempFile::new_in(tempdir).with_context(|| { + format!( + "failed to create temporary input file in `{tempdir}`", + tempdir = tempdir.display() + ) + })?; + // TODO: remotely fetched input contents should be cached somewhere + file.write(&input.fetch().await?).with_context(|| { + format!( + "failed to write input file contents to `{path}`", + path = file.path().display() + ) + })?; + + // Keep the file as the temporary directory itself will clean up the mounts + let (_, path) = file.keep().context("failed to persist temporary file")?; + + path.into_os_string().into_string().map_err(|path| { + eyre!( + "temporary file path `{path}` is not UTF-8", + path = PathBuf::from(&path).display() + ) + })? + }; + + mounts.push(Mount { + target: Some(input.path().to_string()), + source: Some(source), + typ: Some(MountTypeEnum::BIND), + read_only: Some(true), + ..Default::default() + }); + } + + Ok(()) +} + /// Gets the shared mounts (if any exist) from the shared volumes in a [`Task`] /// (via [`Task::shared_volumes()`]). -fn get_shared_mounts<'a>(iter: impl Iterator) -> Result> { - iter.map(|inner_path| { - Ok(Mount { - target: Some(inner_path.to_owned()), - source: Some( - TempDir::new() - .wrap_err("failed to create temporary directory")? - .into_path() - .to_str() - .with_context(|| "temporary path contains non-UTF-8 characters")? - .to_owned(), - ), +fn add_shared_mounts<'a>( + iter: impl Iterator, + tempdir: &Path, + mounts: &mut Vec, +) -> Result<()> { + for target in iter { + // Create new temporary directory in the provided temporary directory + // The call to `into_path` will prevent the directory from being deleted on + // drop; instead, we're relying on the parent temporary directory to delete it + let path = TempDir::new_in(tempdir) + .wrap_err_with(|| { + format!( + "failed to create temporary directory in `{tempdir}`", + tempdir = tempdir.display() + ) + })? + .into_path() + .into_os_string() + .into_string() + .map_err(|path| { + eyre!( + "temporary directory path `{path}` is not UTF-8", + path = PathBuf::from(&path).display() + ) + })?; + + mounts.push(Mount { + target: Some(target.to_owned()), + source: Some(path), typ: Some(MountTypeEnum::BIND), read_only: Some(false), ..Default::default() - }) - }) - .collect::>>() + }); + } + + Ok(()) } diff --git a/crankshaft-engine/src/task/input.rs b/crankshaft-engine/src/task/input.rs index 5e9464f..642dbf8 100644 --- a/crankshaft-engine/src/task/input.rs +++ b/crankshaft-engine/src/task/input.rs @@ -1,9 +1,13 @@ //! Task inputs. use std::borrow::Cow; +use std::fs; use bon::Builder; use eyre::Context; +use eyre::Result; +use eyre::bail; +use eyre::eyre; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -25,19 +29,19 @@ pub enum Type { #[derive(Builder, Clone, Debug)] #[builder(builder_type = Builder)] pub struct Input { - /// A name. + /// An optional name to give the input. #[builder(into)] name: Option, - /// A description. + /// A description of the input. #[builder(into)] description: Option, - /// The contents. + /// The contents of the input. #[builder(into)] contents: Contents, - /// The path to map the input to within the container. + /// The expected guest path of the input. #[builder(into)] path: String, @@ -72,24 +76,39 @@ impl Input { &self.ty } - /// Fetches the file contents via an [`AsyncRead`]er. - pub async fn fetch(&self) -> Cow<'_, [u8]> { + /// Fetches the input contents. + /// + /// This method will return an error if the input is a path to a directory. + pub async fn fetch(&self) -> Result> { match &self.contents { - Contents::Literal(bytes) => bytes.into(), + Contents::Literal(bytes) => Ok(bytes.into()), Contents::Url(url) => match url.scheme() { "file" => { // SAFETY: we just checked to ensure this is a file, so // getting the file path should always unwrap. - let path = url.to_file_path().unwrap(); - let mut file = File::open(path).await.unwrap(); + let path = url.to_file_path().map_err(|_| { + eyre!( + "URL `{url}` has a file scheme but cannot be represented as a file \ + path" + ) + })?; + let mut file = File::open(&path).await.with_context(|| { + format!("failed to open file `{path}`", path = path.display()) + })?; let mut buffer = Vec::with_capacity(4096); - file.read_to_end(&mut buffer).await.unwrap(); - buffer.into() + file.read_to_end(&mut buffer).await.with_context(|| { + format!("failed to read file `{path}`", path = path.display()) + })?; + Ok(buffer.into()) } - "http" | "https" => unimplemented!("http(s) URL support not implemented"), - "s3" => unimplemented!("s3 URL support not implemented"), - v => unreachable!("unsupported URL scheme: {v}"), + "http" | "https" => bail!("support for HTTP URLs is not yet implemented"), + "s3" => bail!("support for S3 URLs is not yet implemented"), + scheme => bail!("URL has unsupported scheme `{scheme}`"), }, + Contents::Path(path) => Ok(fs::read_to_string(path) + .with_context(|| format!("failed to read file `{path}`", path = path.display()))? + .into_bytes() + .into()), } } } @@ -106,7 +125,7 @@ impl TryFrom for tes::v1::types::task::Input { ty, } = input; - let (url, content) = contents.one_hot(); + let (url, content) = contents.one_hot()?; let r#type = match ty { Type::File => tes::v1::types::task::file::Type::File, diff --git a/crankshaft-engine/src/task/input/contents.rs b/crankshaft-engine/src/task/input/contents.rs index 80a375d..28e3d2c 100644 --- a/crankshaft-engine/src/task/input/contents.rs +++ b/crankshaft-engine/src/task/input/contents.rs @@ -1,5 +1,9 @@ //! Contents of an input. +use std::fs; +use std::path::PathBuf; + +use eyre::Context; use thiserror::Error; use url::Url; @@ -22,6 +26,10 @@ pub enum Contents { /// Contents provided as a literal array of bytes. Literal(Vec), + + /// Contents are provided as a path to a file or directory on the host + /// system. + Path(PathBuf), } impl Contents { @@ -34,12 +42,22 @@ impl Contents { /// /// * The first value is the [`Url`] if the type is [`Contents::Url`]. Else, /// the value is [`None`]. - /// * The second value is the literal contents as a [`String`] if the type - /// is [`Contents::Literal`]. Else, the value is [`None`]. - pub fn one_hot(self) -> (Option, Option>) { + /// * The second value is the literal contents as a [`Vec`] if the type + /// is [`Contents::Literal`] or [`Contents::Path`]. Else, the value is + /// [`None`]. + /// + /// Returns an error if the contents are to a path and the file contents + /// could not be read. + pub fn one_hot(self) -> eyre::Result<(Option, Option>)> { match self { - Contents::Url(url) => (Some(url), None), - Contents::Literal(value) => (None, Some(value)), + Self::Url(url) => Ok((Some(url), None)), + Self::Literal(value) => Ok((None, Some(value))), + Self::Path(path) => Ok(( + None, + Some(fs::read(&path).with_context(|| { + format!("failed to read file `{path}`", path = path.display()) + })?), + )), } } } diff --git a/examples/src/docker/main.rs b/examples/src/docker/main.rs index d6ad6f3..bb1c85b 100644 --- a/examples/src/docker/main.rs +++ b/examples/src/docker/main.rs @@ -64,7 +64,7 @@ async fn run(args: Args, token: CancellationToken) -> Result<()> { .display() .to_string(), ) - .image("ubuntu") + .image("alpine") .program("echo") .args([String::from("hello, world!")]) .build(), diff --git a/examples/src/lsf/main.rs b/examples/src/lsf/main.rs index b113617..e23d72d 100644 --- a/examples/src/lsf/main.rs +++ b/examples/src/lsf/main.rs @@ -85,7 +85,7 @@ async fn run(args: Args, token: CancellationToken) -> Result<()> { .display() .to_string(), ) - .image("ubuntu") + .image("alpine") .program("echo") .args([String::from("hello, world!")]) .build(), diff --git a/examples/src/tes/main.rs b/examples/src/tes/main.rs index 19668b5..2ba3a5a 100644 --- a/examples/src/tes/main.rs +++ b/examples/src/tes/main.rs @@ -97,7 +97,7 @@ async fn run(args: Args, token: CancellationToken) -> Result<()> { .display() .to_string(), ) - .image("ubuntu") + .image("alpine") .program("echo") .args([String::from("hello, world!")]) .build(),