-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Server customize shutdown and signals to support embedding #493
Changes from 2 commits
05e0907
11881c5
02c1ea1
0096b01
b7ffdc5
7f5c988
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ mod daemon; | |
#[cfg(unix)] | ||
pub(crate) mod transfer_fd; | ||
|
||
use async_trait::async_trait; | ||
#[cfg(unix)] | ||
use daemon::daemonize; | ||
use log::{debug, error, info, warn}; | ||
|
@@ -59,6 +60,61 @@ pub type ShutdownWatch = watch::Receiver<bool>; | |
#[cfg(unix)] | ||
pub type ListenFds = Arc<Mutex<Fds>>; | ||
|
||
pub enum ShutdownSignal { | ||
GracefulUpgrade, | ||
GracefulTerminate, | ||
FastShutdown, | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would make sense to derive There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: I'm not affiliated with Cloudflare, I just started using your changes from here and ran into an issue due to the missing Debug impl. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the issue you encountered? |
||
#[async_trait] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for the delay, do you mind adding doc comments for these public types as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No problem, added some docs. Let me know if you would like them worded differently. |
||
pub trait ShutdownSignalWatch { | ||
async fn recv(&self) -> ShutdownSignal; | ||
} | ||
|
||
#[cfg(unix)] | ||
pub struct UnixShutdownSignalWatch; | ||
|
||
#[cfg(unix)] | ||
#[async_trait] | ||
impl ShutdownSignalWatch for UnixShutdownSignalWatch { | ||
async fn recv(&self) -> ShutdownSignal { | ||
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap(); | ||
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap(); | ||
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); | ||
|
||
tokio::select! { | ||
_ = graceful_upgrade_signal.recv() => { | ||
ShutdownSignal::GracefulUpgrade | ||
}, | ||
_ = graceful_terminate_signal.recv() => { | ||
ShutdownSignal::GracefulTerminate | ||
}, | ||
_ = fast_shutdown_signal.recv() => { | ||
ShutdownSignal::FastShutdown | ||
}, | ||
} | ||
} | ||
} | ||
|
||
pub struct RunArgs { | ||
#[cfg(unix)] | ||
pub shutdown_signal: Box<dyn ShutdownSignalWatch>, | ||
} | ||
|
||
impl Default for RunArgs { | ||
#[cfg(unix)] | ||
fn default() -> Self { | ||
Self { | ||
shutdown_signal: Box::new(UnixShutdownSignalWatch), | ||
} | ||
} | ||
|
||
#[cfg(windows)] | ||
fn default() -> Self { | ||
Self {} | ||
} | ||
} | ||
|
||
/// The server object | ||
/// | ||
/// This object represents an entire pingora server process which may have multiple independent | ||
|
@@ -87,43 +143,41 @@ pub struct Server { | |
|
||
impl Server { | ||
#[cfg(unix)] | ||
async fn main_loop(&self) -> ShutdownType { | ||
async fn main_loop(&self, run_args: RunArgs) -> ShutdownType { | ||
// waiting for exit signal | ||
// TODO: there should be a signal handling function | ||
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap(); | ||
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap(); | ||
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); | ||
tokio::select! { | ||
_ = fast_shutdown_signal.recv() => { | ||
match run_args.shutdown_signal.recv().await { | ||
ShutdownSignal::FastShutdown => { | ||
info!("SIGINT received, exiting"); | ||
ShutdownType::Quick | ||
}, | ||
_ = graceful_terminate_signal.recv() => { | ||
} | ||
ShutdownSignal::GracefulTerminate => { | ||
// we receive a graceful terminate, all instances are instructed to stop | ||
info!("SIGTERM received, gracefully exiting"); | ||
// graceful shutdown if there are listening sockets | ||
info!("Broadcasting graceful shutdown"); | ||
match self.shutdown_watch.send(true) { | ||
Ok(_) => { info!("Graceful shutdown started!"); } | ||
Ok(_) => { | ||
info!("Graceful shutdown started!"); | ||
} | ||
Err(e) => { | ||
error!("Graceful shutdown broadcast failed: {e}"); | ||
} | ||
} | ||
info!("Broadcast graceful shutdown complete"); | ||
ShutdownType::Graceful | ||
} | ||
_ = graceful_upgrade_signal.recv() => { | ||
ShutdownSignal::GracefulUpgrade => { | ||
// TODO: still need to select! on signals in case a fast shutdown is needed | ||
// aka: move below to another task and only kick it off here | ||
info!("SIGQUIT received, sending socks and gracefully exiting"); | ||
if let Some(fds) = &self.listen_fds { | ||
let fds = fds.lock().await; | ||
info!("Trying to send socks"); | ||
// XXX: this is blocking IO | ||
match fds.send_to_sock( | ||
self.configuration.as_ref().upgrade_sock.as_str()) | ||
{ | ||
Ok(_) => {info!("listener sockets sent");}, | ||
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) { | ||
Ok(_) => { | ||
info!("listener sockets sent"); | ||
} | ||
Err(e) => { | ||
error!("Unable to send listener sockets to new process: {e}"); | ||
// sentry log error on fd send failure | ||
|
@@ -135,7 +189,9 @@ impl Server { | |
info!("Broadcasting graceful shutdown"); | ||
// gracefully exiting | ||
match self.shutdown_watch.send(true) { | ||
Ok(_) => { info!("Graceful shutdown started!"); } | ||
Ok(_) => { | ||
info!("Graceful shutdown started!"); | ||
} | ||
Err(e) => { | ||
error!("Graceful shutdown broadcast failed: {e}"); | ||
// switch to fast shutdown | ||
|
@@ -148,7 +204,7 @@ impl Server { | |
info!("No socks to send, shutting down."); | ||
ShutdownType::Graceful | ||
} | ||
}, | ||
} | ||
} | ||
} | ||
|
||
|
@@ -306,14 +362,25 @@ impl Server { | |
} | ||
} | ||
|
||
/// Start the server using [Self::run] and default [RunArgs]. | ||
#[allow(unused_mut)] // TODO: May not need to keep mut self in interface | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed |
||
pub fn run_forever(mut self) -> ! { | ||
info!("Server starting"); | ||
|
||
self.run(RunArgs::default()); | ||
|
||
info!("All runtimes exited, exiting now"); | ||
std::process::exit(0) | ||
} | ||
|
||
/// Start the server | ||
/// | ||
/// This function will block forever until the server needs to quit. So this would be the last | ||
/// function to call for this object. | ||
/// | ||
/// Note: this function may fork the process for daemonization, so any additional threads created | ||
/// before this function will be lost to any service logic once this function is called. | ||
pub fn run_forever(mut self) -> ! { | ||
pub fn run(mut self, run_args: RunArgs) { | ||
info!("Server starting"); | ||
|
||
let conf = self.configuration.as_ref(); | ||
|
@@ -354,7 +421,9 @@ impl Server { | |
// Only work steal runtime can use block_on() | ||
let server_runtime = Server::create_runtime("Server", 1, true); | ||
#[cfg(unix)] | ||
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop()); | ||
let shutdown_type = server_runtime | ||
.get_handle() | ||
.block_on(self.main_loop(run_args)); | ||
#[cfg(windows)] | ||
let shutdown_type = ShutdownType::Graceful; | ||
|
||
|
@@ -394,8 +463,6 @@ impl Server { | |
error!("Failed to shutdown runtime: {:?}", e); | ||
} | ||
} | ||
info!("All runtimes exited, exiting now"); | ||
std::process::exit(0) | ||
} | ||
|
||
fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add doc comments for these public types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added doc comments