Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Safe shutdown #144

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ serde = { version = "1.0.204", features = ["derive"] }
fxhash = "0.2.1"
toml = "0.8.19"
nix = "0.29.0"
signal-hook = "0.3.17"
ctrlc = { version = "3.4.5", features = ["termination"] }

[dev-dependencies]
rand = "0.8.5"
2 changes: 0 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl From<IpFrom> for load_balancing::IpExtractor {
}
}

#[cfg(unix)]
#[derive(clap::ValueEnum, Debug, Clone, Copy, Default)]
pub enum Shutdown {
Graceful,
Expand Down Expand Up @@ -178,7 +177,6 @@ pub struct Args {
pub command: Commands,

/// The strategy for shutting down faucet
#[cfg(unix)]
#[arg(long, env = "FAUCET_SHUTDOWN", default_value = "immediate")]
pub shutdown: Shutdown,
}
Expand Down
65 changes: 41 additions & 24 deletions src/client/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,17 @@ fn log_stdio(mut child: Child, target: &'static str) -> FaucetResult<Child> {
}

#[derive(Copy, Clone)]
pub(crate) struct WorkerConfig {
pub(crate) wtype: WorkerType,
pub(crate) app_dir: Option<&'static str>,
pub(crate) rscript: &'static OsStr,
pub(crate) quarto: &'static OsStr,
pub(crate) workdir: &'static Path,
pub(crate) addr: SocketAddr,
pub(crate) target: &'static str,
pub(crate) worker_id: usize,
pub(crate) is_online: &'static AtomicBool,
pub(crate) qmd: Option<&'static Path>,
pub struct WorkerConfig {
pub wtype: WorkerType,
pub app_dir: Option<&'static str>,
pub rscript: &'static OsStr,
pub quarto: &'static OsStr,
pub workdir: &'static Path,
pub addr: SocketAddr,
pub target: &'static str,
pub worker_id: usize,
pub is_online: &'static AtomicBool,
pub qmd: Option<&'static Path>,
}

impl WorkerConfig {
Expand Down Expand Up @@ -206,11 +206,11 @@ impl WorkerConfig {
}
}

struct Worker {
pub struct Worker {
/// Whether the worker should be stopped
_worker_task: JoinHandle<FaucetResult<()>>,
pub child: WorkerChild,
/// The address of the worker's socket.
config: WorkerConfig,
pub config: WorkerConfig,
}

async fn check_if_online(addr: SocketAddr) -> bool {
Expand All @@ -220,8 +220,20 @@ async fn check_if_online(addr: SocketAddr) -> bool {

const RECHECK_INTERVAL: Duration = Duration::from_millis(250);

fn spawn_worker_task(config: WorkerConfig) -> JoinHandle<FaucetResult<()>> {
tokio::spawn(async move {
pub struct WorkerChild {
_handle: JoinHandle<FaucetResult<()>>,
stopper: tokio::sync::mpsc::Sender<()>,
}

impl WorkerChild {
pub fn kill(&self) {
let _ = self.stopper.try_send(());
}
}

fn spawn_worker_task(config: WorkerConfig) -> WorkerChild {
let (stopper, mut rx) = tokio::sync::mpsc::channel(1);
let handle = tokio::spawn(async move {
loop {
let mut child = config.spawn_process(config);
let pid = child.id().expect("Failed to get plumber worker PID");
Expand All @@ -246,27 +258,32 @@ fn spawn_worker_task(config: WorkerConfig) -> JoinHandle<FaucetResult<()>> {
tokio::time::sleep(RECHECK_INTERVAL).await;
}

tokio::select! {
_ = child.wait() => (),
_ = rx.recv() => return FaucetResult::Ok(()),
}
let status = child.wait().await?;
config
.is_online
.store(false, std::sync::atomic::Ordering::SeqCst);
log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status, target = config.target);
}
})
});
WorkerChild {
_handle: handle,
stopper,
}
}

impl Worker {
pub fn from_config(config: WorkerConfig) -> FaucetResult<Self> {
let worker_task = spawn_worker_task(config);
Ok(Self {
_worker_task: worker_task,
config,
})
let child = spawn_worker_task(config);
Ok(Self { child, config })
}
}

pub(crate) struct Workers {
workers: Box<[Worker]>,
pub struct Workers {
pub workers: Box<[Worker]>,
}

impl Workers {
Expand Down
48 changes: 12 additions & 36 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,36 +206,28 @@ mod tests {
.body(())
.unwrap_err();

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_io_error() {
let err = std::io::Error::new(std::io::ErrorKind::Other, "test");

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_pool_error() {
let err = deadpool::managed::PoolError::Backend(FaucetError::unknown("test"));

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_pool_build_error() {
let err = deadpool::managed::BuildError::NoRuntimeSpecified;

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
Expand All @@ -244,18 +236,14 @@ mod tests {
deadpool::managed::TimeoutType::Create,
);

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_pool_closed_error() {
let err = deadpool::managed::PoolError::<FaucetError>::Closed;

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
Expand All @@ -264,50 +252,38 @@ mod tests {
deadpool::managed::HookError::message("test"),
);

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_pool_no_runtime_specified_error() {
let err = deadpool::managed::PoolError::<FaucetError>::NoRuntimeSpecified;

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_hyper_invalid_header_value_error() {
let err = hyper::header::HeaderValue::from_bytes([0x00].as_ref()).unwrap_err();

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_from_addr_parse_error() {
let err = "INVALID".parse::<std::net::SocketAddr>().unwrap_err();

let err: FaucetError = From::from(err);
format!("{:?}", err);
format!("{}", err);
let _err: FaucetError = From::from(err);
}

#[test]
fn test_faucet_error_displat_missing_header() {
let err = FaucetError::BadRequest(BadRequestReason::MissingHeader("test"));
format!("{:?}", err);
format!("{}", err);
let _err = FaucetError::BadRequest(BadRequestReason::MissingHeader("test"));
}

#[test]
fn test_faucet_error_displat_invalid_header() {
let err = FaucetError::BadRequest(BadRequestReason::InvalidHeader("test"));
format!("{:?}", err);
format!("{}", err);
let _err = FaucetError::BadRequest(BadRequestReason::InvalidHeader("test"));
}

#[test]
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub mod error;
pub mod global_conn;
pub(crate) mod networking;
pub mod server;
#[cfg(unix)]
pub mod shutdown;

macro_rules! leak {
Expand Down
Loading