diff --git a/Cargo.lock b/Cargo.lock index c5ca3fd0..b2d2e3ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "actix-codec" @@ -1848,6 +1848,7 @@ dependencies = [ "serde_json", "serial_test", "thiserror", + "tokio", "tracing", "tracing-actix-web", "tracing-futures", diff --git a/bins/nittei/src/main.rs b/bins/nittei/src/main.rs index 0d81fcda..81552f12 100644 --- a/bins/nittei/src/main.rs +++ b/bins/nittei/src/main.rs @@ -5,6 +5,8 @@ use nittei_infra::setup_context; use telemetry::init_subscriber; #[cfg(all(target_env = "musl", target_pointer_width = "64"))] use tikv_jemallocator::Jemalloc; +use tokio::signal; +use tracing::{error, info}; // Use Jemalloc only for musl-64 bits platforms // The default MUSL allocator is known to be slower than Jemalloc @@ -22,7 +24,27 @@ async fn main() -> anyhow::Result<()> { init_subscriber()?; let context = setup_context().await?; + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); let app = Application::new(context).await?; - app.start().await + + // Listen for SIGINT (Ctrl+C) to shutdown the service + // This sends a message on the channel to shutdown the server gracefully + // By doing so, it makes the app return failed status on the status API endpoint (useful for k8s) + // It waits for a configurable amount of seconds (in order for the readiness probe to fail) + // And then waits for the server to finish processing the current requests before shutting down + tokio::spawn(async move { + if let Err(e) = signal::ctrl_c().await { + error!("[main] Failed to listen for SIGINT: {}", e); + } + info!("[shutdown] Received SIGINT, sending event on channel..."); + let _ = tx.send(()); + }); + + // Start the application and block until it finishes + app.start(rx).await?; + + info!("[shutdown] shutdown complete"); + + Ok(()) } diff --git a/bins/nittei/tests/helpers/setup.rs b/bins/nittei/tests/helpers/setup.rs index 9fe02b4d..8ba21e60 100644 --- a/bins/nittei/tests/helpers/setup.rs +++ b/bins/nittei/tests/helpers/setup.rs @@ -22,12 +22,14 @@ pub async fn spawn_app() -> (TestApp, NitteiSDK, String) { let address = format!("http://localhost:{}", application.port()); + let (_, rx) = tokio::sync::oneshot::channel::<()>(); + // Allow underscore future because it needs to run in background // If we `await` it, the tests will hang #[allow(clippy::let_underscore_future)] let _ = actix_web::rt::spawn(async move { application - .start() + .start(rx) .await .expect("Expected application to start"); }); diff --git a/crates/api/Cargo.toml b/crates/api/Cargo.toml index f77d2867..d305c5a4 100644 --- a/crates/api/Cargo.toml +++ b/crates/api/Cargo.toml @@ -28,6 +28,7 @@ chrono-tz = "0.8.1" anyhow = "1.0" jsonwebtoken = "7" thiserror = "1.0" +tokio = { version = "1.0", features = ["full"] } tracing = "0.1.25" tracing-actix-web = { version = "0.7.11", features = ["opentelemetry_0_23"] } tracing-futures = "0.2.5" diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 84f3c550..d452c480 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -10,7 +10,7 @@ mod shared; mod status; mod user; -use std::net::TcpListener; +use std::{net::TcpListener, sync::Arc}; use actix_cors::Cors; use actix_web::{ @@ -20,6 +20,7 @@ use actix_web::{ App, HttpServer, }; +use futures::lock::Mutex; use http_logger::NitteiTracingRootSpanBuilder; use job_schedulers::{start_reminder_generation_job, start_send_reminders_job}; use nittei_domain::{ @@ -54,11 +55,28 @@ pub struct Application { port: u16, /// The application context (database connections, etc.) context: NitteiContext, + + /// Shutdown data + /// Shared state of the server + shared_state: ServerSharedState, +} + +/// Struct for storing the shared state of the server +/// Mainly useful for sharing the shutdown flag between the binary crate and the status endpoint +#[derive(Clone)] +pub struct ServerSharedState { + /// Flag to indicate if the application is shutting down + pub is_shutting_down: Arc>, } impl Application { pub async fn new(context: NitteiContext) -> anyhow::Result { - let (server, port) = Application::configure_server(context.clone()).await?; + let shared_state = ServerSharedState { + is_shutting_down: Arc::new(Mutex::new(false)), + }; + + let (server, port) = + Application::configure_server(context.clone(), shared_state.clone()).await?; Application::start_jobs(context.clone()); @@ -66,6 +84,7 @@ impl Application { server, port, context, + shared_state, }) } @@ -89,7 +108,10 @@ impl Application { /// - CORS (permissive) /// - Compression /// - Tracing logger - async fn configure_server(context: NitteiContext) -> anyhow::Result<(Server, u16)> { + async fn configure_server( + context: NitteiContext, + shared_state: ServerSharedState, + ) -> anyhow::Result<(Server, u16)> { let port = context.config.port; let address = nittei_utils::config::APP_CONFIG.http_host.clone(); let address_and_port = format!("{}:{}", address, port); @@ -99,14 +121,22 @@ impl Application { let server = HttpServer::new(move || { let ctx = context.clone(); + let shared_state = shared_state.clone(); App::new() .wrap(Cors::permissive()) .wrap(middleware::Compress::default()) .wrap(TracingLogger::::new()) .app_data(Data::new(ctx)) + .app_data(Data::new(shared_state)) .service(web::scope("/api/v1").configure(configure_server_api)) }) + // Disable signals to avoid conflicts with the signal handler + // This is handled by the signal handler in the binary and the `schedule_shutdown` function + .disable_signals() + // Set the shutdown timeout (time to wait for the server to finish processing requests) + // Default is 30 seconds + .shutdown_timeout(nittei_utils::config::APP_CONFIG.server_shutdown_timeout) .listen(listener)? .workers(4) .run(); @@ -115,7 +145,14 @@ impl Application { } /// Init the default account and start the Actix server - pub async fn start(self) -> anyhow::Result<()> { + /// + /// It also sets up the shutdown handler + pub async fn start( + self, + shutdown_channel: tokio::sync::oneshot::Receiver<()>, + ) -> anyhow::Result<()> { + self.setup_shutdown_handler(shutdown_channel); + self.init_default_account().await?; self.server.await.map_err(|e| anyhow::anyhow!(e)) } @@ -274,4 +311,48 @@ impl Application { }; Ok(()) } + + /// Setup the shutdown handler + fn setup_shutdown_handler(&self, shutdown_channel: tokio::sync::oneshot::Receiver<()>) { + let server_handle = self.server.handle(); + let shared_state = self.shared_state.clone(); + + // Listen to shutdown channel + tokio::spawn(async move { + // Wait for the shutdown channel to receive a message + if let Err(e) = shutdown_channel.await { + error!("[server] Failed to listen for shutdown channel: {}", e); + } else { + info!("[server] Received shutdown signal",); + + if cfg!(debug_assertions) { + // In debug mode, stop the server immediately + info!("[server] Stopping server..."); + server_handle.stop(true).await; + info!("[server] Server stopped"); + } else { + // In production, do the whole graceful shutdown process + + // Update flag + *shared_state.is_shutting_down.lock().await = true; + + info!("[server] is_shutting_down flag is now true"); + + let duration = nittei_utils::config::APP_CONFIG.server_shutdown_sleep; + + info!("[server] Waiting {}s before stopping", duration); + + // Wait for the timeout + tokio::time::sleep(std::time::Duration::from_secs(duration)).await; + + info!("[server] Stopping server..."); + + // Shutdown the server + server_handle.stop(true).await; + + info!("[server] Server stopped"); + } + } + }); + } } diff --git a/crates/api/src/status/mod.rs b/crates/api/src/status/mod.rs index 6ef67fef..7602d726 100644 --- a/crates/api/src/status/mod.rs +++ b/crates/api/src/status/mod.rs @@ -2,8 +2,21 @@ use actix_web::{web, HttpResponse}; use nittei_api_structs::get_service_health::*; use nittei_infra::NitteiContext; +use crate::ServerSharedState; + /// Get the status of the service -async fn status(ctx: web::Data) -> HttpResponse { +async fn status( + ctx: web::Data, + shared_state: web::Data, +) -> HttpResponse { + let is_shutting_down = shared_state.is_shutting_down.lock().await; + + if *is_shutting_down { + return HttpResponse::ServiceUnavailable().json(APIResponse { + message: "Service is shutting down".into(), + }); + } + match ctx.repos.status.check_connection().await { Ok(_) => HttpResponse::Ok().json(APIResponse { message: "Ok!\r\n".into(), diff --git a/crates/utils/src/config.rs b/crates/utils/src/config.rs index 4050a8d3..a6b97f9e 100644 --- a/crates/utils/src/config.rs +++ b/crates/utils/src/config.rs @@ -17,6 +17,16 @@ pub struct AppConfig { /// Env var: NITTEI__HTTP_PORT pub http_port: usize, + /// The sleep time for the HTTP server shutdown (in seconds) + /// Default is 25 seconds + /// Env var: NITTEI__SERVER_SHUTDOWN_SLEEP + pub server_shutdown_sleep: u64, + + /// The shutdown timeout for the HTTP server (in seconds) + /// Default is 5 seconds + /// Env var: NITTEI__SERVER_SHUTDOWN_TIMEOUT + pub server_shutdown_timeout: u64, + /// The database URL /// Default is postgresql://postgres:postgres@localhost:45432/nittei /// Env var: NITTEI__DATABASE_URL @@ -139,6 +149,10 @@ fn parse_config() -> AppConfig { .expect("Failed to set default host") .set_default("http_port", "5000") .expect("Failed to set default port") + .set_default("server_shutdown_sleep", "25") + .expect("Failed to set default server_shutdown_sleep") + .set_default("server_shutdown_timeout", "5") + .expect("Failed to set default server_shutdown_timeout") .set_default("skip_db_migrations", false) .expect("Failed to set default skip_db_migrations") .set_default("enable_reminders_job", false)