Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Server customize shutdown and signals to support embedding #493

Closed
wants to merge 6 commits into from
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 93 additions & 21 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod daemon;
#[cfg(unix)]
pub(crate) mod transfer_fd;

use async_trait::async_trait;
#[cfg(unix)]
use daemon::daemonize;
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -59,6 +60,67 @@ pub type ShutdownWatch = watch::Receiver<bool>;
#[cfg(unix)]
pub type ListenFds = Arc<Mutex<Fds>>;

/// The type of shutdown process that has been requested.
pub enum ShutdownSignal {
/// Send file descriptors to the new process before starting runtime shutdown with
/// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
GracefulUpgrade,
/// Wait for [ServerConf::grace_period_seconds] before starting runtime shutdown with
/// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
GracefulTerminate,
/// Shutdown with no timeout for runtime shutdown.
FastShutdown,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add doc comments for these public types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added doc comments

Copy link

@theduke theduke Jan 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make sense to derive Debug here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added Debug derive. Not sure if you also want me to go ahead and add to other related ones like ShutdownType?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I'm not affiliated with Cloudflare, I just started using your changes from here and ran into an issue due to the missing Debug impl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the issue you encountered?

#[async_trait]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay, do you mind adding doc comments for these public types as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, added some docs. Let me know if you would like them worded differently.

pub trait ShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal;
}

#[cfg(unix)]
pub struct UnixShutdownSignalWatch;

#[cfg(unix)]
#[async_trait]
impl ShutdownSignalWatch for UnixShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal {
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();

tokio::select! {
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulUpgrade
},
_ = graceful_terminate_signal.recv() => {
ShutdownSignal::GracefulTerminate
},
_ = fast_shutdown_signal.recv() => {
ShutdownSignal::FastShutdown
},
}
}
}

pub struct RunArgs {
#[cfg(unix)]
pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
}

impl Default for RunArgs {
#[cfg(unix)]
fn default() -> Self {
Self {
shutdown_signal: Box::new(UnixShutdownSignalWatch),
}
}

#[cfg(windows)]
fn default() -> Self {
Self {}
}
}

/// The server object
///
/// This object represents an entire pingora server process which may have multiple independent
Expand Down Expand Up @@ -87,43 +149,41 @@ pub struct Server {

impl Server {
#[cfg(unix)]
async fn main_loop(&self) -> ShutdownType {
async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
// waiting for exit signal
// TODO: there should be a signal handling function
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = fast_shutdown_signal.recv() => {
match run_args.shutdown_signal.recv().await {
ShutdownSignal::FastShutdown => {
info!("SIGINT received, exiting");
ShutdownType::Quick
},
_ = graceful_terminate_signal.recv() => {
}
ShutdownSignal::GracefulTerminate => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulUpgrade => {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(
self.configuration.as_ref().upgrade_sock.as_str())
{
Ok(_) => {info!("listener sockets sent");},
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
Ok(_) => {
info!("listener sockets sent");
}
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
// sentry log error on fd send failure
Expand All @@ -135,7 +195,9 @@ impl Server {
info!("Broadcasting graceful shutdown");
// gracefully exiting
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
Expand All @@ -148,7 +210,7 @@ impl Server {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
}
},
}
}
}

Expand Down Expand Up @@ -306,14 +368,24 @@ impl Server {
}
}

/// Start the server using [Self::run] and default [RunArgs].
pub fn run_forever(self) -> ! {
info!("Server starting");

self.run(RunArgs::default());

info!("All runtimes exited, exiting now");
std::process::exit(0)
}

/// Start the server
///
/// This function will block forever until the server needs to quit. So this would be the last
/// function to call for this object.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(mut self) -> ! {
pub fn run(mut self, run_args: RunArgs) {
info!("Server starting");

let conf = self.configuration.as_ref();
Expand Down Expand Up @@ -354,7 +426,9 @@ impl Server {
// Only work steal runtime can use block_on()
let server_runtime = Server::create_runtime("Server", 1, true);
#[cfg(unix)]
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
let shutdown_type = server_runtime
.get_handle()
.block_on(self.main_loop(run_args));
#[cfg(windows)]
let shutdown_type = ShutdownType::Graceful;

Expand Down Expand Up @@ -394,8 +468,6 @@ impl Server {
error!("Failed to shutdown runtime: {:?}", e);
}
}
info!("All runtimes exited, exiting now");
std::process::exit(0)
}

fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
Expand Down
Loading