Skip to content

Commit

Permalink
This change supports building a server that can run Pingora without it
Browse files Browse the repository at this point in the history
calling std::process::exit during shutdown.
---
Fix pingora-core Server formatting to pass ci check.
---
Add doc comments for ShutdownSignal
---
Remove unused mut self from run_forever
---
Add Debug derive for Shutdown signal
---
Add docs for shutdown handling

Includes-commit: 7f5c988
Includes-commit: b7ffdc5
Includes-commit: 0096b01
Includes-commit: 02c1ea1
Includes-commit: 05e0907
Includes-commit: 11881c5
Replicated-from: #493
  • Loading branch information
kriswuollett authored and andrewhavck committed Jan 17, 2025
1 parent 263c955 commit bee5b00
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0112f15a8feefd887a02b714f4ea1cbde410a461
c7924e98d353a7e941d3d54931aa5a42c5fe99b9
125 changes: 104 additions & 21 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod daemon;
#[cfg(unix)]
pub(crate) mod transfer_fd;

use async_trait::async_trait;
#[cfg(unix)]
use daemon::daemonize;
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -59,6 +60,78 @@ pub type ShutdownWatch = watch::Receiver<bool>;
#[cfg(unix)]
pub type ListenFds = Arc<Mutex<Fds>>;

/// The type of shutdown process that has been requested.
#[derive(Debug)]
pub enum ShutdownSignal {
/// Send file descriptors to the new process before starting runtime shutdown with
/// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
GracefulUpgrade,
/// Wait for [ServerConf::grace_period_seconds] before starting runtime shutdown with
/// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
GracefulTerminate,
/// Shutdown with no timeout for runtime shutdown.
FastShutdown,
}

/// Watcher of a shutdown signal, e.g., [UnixShutdownSignalWatch] for Unix-like
/// platforms.
#[async_trait]
pub trait ShutdownSignalWatch {
/// Returns the desired shutdown type once one has been requested.
async fn recv(&self) -> ShutdownSignal;
}

/// A Unix shutdown watcher that awaits for Unix signals.
///
/// - `SIGQUIT`: graceful upgrade
/// - `SIGTERM`: graceful terminate
/// - `SIGINT`: fast shutdown
#[cfg(unix)]
pub struct UnixShutdownSignalWatch;

#[cfg(unix)]
#[async_trait]
impl ShutdownSignalWatch for UnixShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal {
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();

tokio::select! {
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulUpgrade
},
_ = graceful_terminate_signal.recv() => {
ShutdownSignal::GracefulTerminate
},
_ = fast_shutdown_signal.recv() => {
ShutdownSignal::FastShutdown
},
}
}
}

/// Arguments to configure running of the pingora server.
pub struct RunArgs {
/// Signal for initating shutdown
#[cfg(unix)]
pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
}

impl Default for RunArgs {
#[cfg(unix)]
fn default() -> Self {
Self {
shutdown_signal: Box::new(UnixShutdownSignalWatch),
}
}

#[cfg(windows)]
fn default() -> Self {
Self {}
}
}

/// The server object
///
/// This object represents an entire pingora server process which may have multiple independent
Expand Down Expand Up @@ -87,43 +160,41 @@ pub struct Server {

impl Server {
#[cfg(unix)]
async fn main_loop(&self) -> ShutdownType {
async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
// waiting for exit signal
// TODO: there should be a signal handling function
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = fast_shutdown_signal.recv() => {
match run_args.shutdown_signal.recv().await {
ShutdownSignal::FastShutdown => {
info!("SIGINT received, exiting");
ShutdownType::Quick
},
_ = graceful_terminate_signal.recv() => {
}
ShutdownSignal::GracefulTerminate => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulUpgrade => {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(
self.configuration.as_ref().upgrade_sock.as_str())
{
Ok(_) => {info!("listener sockets sent");},
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
Ok(_) => {
info!("listener sockets sent");
}
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
// sentry log error on fd send failure
Expand All @@ -135,7 +206,9 @@ impl Server {
info!("Broadcasting graceful shutdown");
// gracefully exiting
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
Expand All @@ -148,7 +221,7 @@ impl Server {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
}
},
}
}
}

Expand Down Expand Up @@ -306,14 +379,24 @@ impl Server {
}
}

/// Start the server using [Self::run] and default [RunArgs].
pub fn run_forever(self) -> ! {
info!("Server starting");

self.run(RunArgs::default());

info!("All runtimes exited, exiting now");
std::process::exit(0)
}

/// Start the server
///
/// This function will block forever until the server needs to quit. So this would be the last
/// function to call for this object.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(mut self) -> ! {
pub fn run(mut self, run_args: RunArgs) {
info!("Server starting");

let conf = self.configuration.as_ref();
Expand Down Expand Up @@ -354,7 +437,9 @@ impl Server {
// Only work steal runtime can use block_on()
let server_runtime = Server::create_runtime("Server", 1, true);
#[cfg(unix)]
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
let shutdown_type = server_runtime
.get_handle()
.block_on(self.main_loop(run_args));
#[cfg(windows)]
let shutdown_type = ShutdownType::Graceful;

Expand Down Expand Up @@ -394,8 +479,6 @@ impl Server {
error!("Failed to shutdown runtime: {:?}", e);
}
}
info!("All runtimes exited, exiting now");
std::process::exit(0)
}

fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
Expand Down

0 comments on commit bee5b00

Please sign in to comment.