Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NO-ISSUE] feat/handle graceful shutdown for k8s #154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion bins/nittei/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use nittei_infra::setup_context;
use telemetry::init_subscriber;
#[cfg(all(target_env = "musl", target_pointer_width = "64"))]
use tikv_jemallocator::Jemalloc;
use tokio::signal;
use tracing::{error, info};

// Use Jemalloc only for musl-64 bits platforms
// The default MUSL allocator is known to be slower than Jemalloc
Expand All @@ -22,7 +24,27 @@ async fn main() -> anyhow::Result<()> {
init_subscriber()?;

let context = setup_context().await?;
let (tx, rx) = tokio::sync::oneshot::channel::<()>();

let app = Application::new(context).await?;
app.start().await

// Listen for SIGINT (Ctrl+C) to shutdown the service
// This sends a message on the channel to shutdown the server gracefully
// By doing so, it makes the app return failed status on the status API endpoint (useful for k8s)
// It waits for a configurable amount of seconds (in order for the readiness probe to fail)
// And then waits for the server to finish processing the current requests before shutting down
tokio::spawn(async move {
if let Err(e) = signal::ctrl_c().await {
error!("[main] Failed to listen for SIGINT: {}", e);
}
info!("[shutdown] Received SIGINT, sending event on channel...");
let _ = tx.send(());
});

// Start the application and block until it finishes
app.start(rx).await?;

info!("[shutdown] shutdown complete");

Ok(())
}
4 changes: 3 additions & 1 deletion bins/nittei/tests/helpers/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ pub async fn spawn_app() -> (TestApp, NitteiSDK, String) {

let address = format!("http://localhost:{}", application.port());

let (_, rx) = tokio::sync::oneshot::channel::<()>();

// Allow underscore future because it needs to run in background
// If we `await` it, the tests will hang
#[allow(clippy::let_underscore_future)]
let _ = actix_web::rt::spawn(async move {
application
.start()
.start(rx)
.await
.expect("Expected application to start");
});
Expand Down
1 change: 1 addition & 0 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ chrono-tz = "0.8.1"
anyhow = "1.0"
jsonwebtoken = "7"
thiserror = "1.0"
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1.25"
tracing-actix-web = { version = "0.7.11", features = ["opentelemetry_0_23"] }
tracing-futures = "0.2.5"
Expand Down
89 changes: 85 additions & 4 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod shared;
mod status;
mod user;

use std::net::TcpListener;
use std::{net::TcpListener, sync::Arc};

use actix_cors::Cors;
use actix_web::{
Expand All @@ -20,6 +20,7 @@ use actix_web::{
App,
HttpServer,
};
use futures::lock::Mutex;
use http_logger::NitteiTracingRootSpanBuilder;
use job_schedulers::{start_reminder_generation_job, start_send_reminders_job};
use nittei_domain::{
Expand Down Expand Up @@ -54,18 +55,36 @@ pub struct Application {
port: u16,
/// The application context (database connections, etc.)
context: NitteiContext,

/// Shutdown data
/// Shared state of the server
shared_state: ServerSharedState,
}

/// Struct for storing the shared state of the server
/// Mainly useful for sharing the shutdown flag between the binary crate and the status endpoint
#[derive(Clone)]
pub struct ServerSharedState {
/// Flag to indicate if the application is shutting down
pub is_shutting_down: Arc<Mutex<bool>>,
}

impl Application {
pub async fn new(context: NitteiContext) -> anyhow::Result<Self> {
let (server, port) = Application::configure_server(context.clone()).await?;
let shared_state = ServerSharedState {
is_shutting_down: Arc::new(Mutex::new(false)),
};

let (server, port) =
Application::configure_server(context.clone(), shared_state.clone()).await?;

Application::start_jobs(context.clone());

Ok(Self {
server,
port,
context,
shared_state,
})
}

Expand All @@ -89,7 +108,10 @@ impl Application {
/// - CORS (permissive)
/// - Compression
/// - Tracing logger
async fn configure_server(context: NitteiContext) -> anyhow::Result<(Server, u16)> {
async fn configure_server(
context: NitteiContext,
shared_state: ServerSharedState,
) -> anyhow::Result<(Server, u16)> {
let port = context.config.port;
let address = nittei_utils::config::APP_CONFIG.http_host.clone();
let address_and_port = format!("{}:{}", address, port);
Expand All @@ -99,14 +121,22 @@ impl Application {

let server = HttpServer::new(move || {
let ctx = context.clone();
let shared_state = shared_state.clone();

App::new()
.wrap(Cors::permissive())
.wrap(middleware::Compress::default())
.wrap(TracingLogger::<NitteiTracingRootSpanBuilder>::new())
.app_data(Data::new(ctx))
.app_data(Data::new(shared_state))
.service(web::scope("/api/v1").configure(configure_server_api))
})
// Disable signals to avoid conflicts with the signal handler
// This is handled by the signal handler in the binary and the `schedule_shutdown` function
.disable_signals()
// Set the shutdown timeout (time to wait for the server to finish processing requests)
// Default is 30 seconds
.shutdown_timeout(nittei_utils::config::APP_CONFIG.server_shutdown_timeout)
.listen(listener)?
.workers(4)
.run();
Expand All @@ -115,7 +145,14 @@ impl Application {
}

/// Init the default account and start the Actix server
pub async fn start(self) -> anyhow::Result<()> {
///
/// It also sets up the shutdown handler
pub async fn start(
self,
shutdown_channel: tokio::sync::oneshot::Receiver<()>,
) -> anyhow::Result<()> {
self.setup_shutdown_handler(shutdown_channel);

self.init_default_account().await?;
self.server.await.map_err(|e| anyhow::anyhow!(e))
}
Expand Down Expand Up @@ -274,4 +311,48 @@ impl Application {
};
Ok(())
}

/// Setup the shutdown handler
fn setup_shutdown_handler(&self, shutdown_channel: tokio::sync::oneshot::Receiver<()>) {
let server_handle = self.server.handle();
let shared_state = self.shared_state.clone();

// Listen to shutdown channel
tokio::spawn(async move {
// Wait for the shutdown channel to receive a message
if let Err(e) = shutdown_channel.await {
error!("[server] Failed to listen for shutdown channel: {}", e);
} else {
info!("[server] Received shutdown signal",);

if cfg!(debug_assertions) {
// In debug mode, stop the server immediately
info!("[server] Stopping server...");
server_handle.stop(true).await;
info!("[server] Server stopped");
} else {
// In production, do the whole graceful shutdown process

// Update flag
*shared_state.is_shutting_down.lock().await = true;

info!("[server] is_shutting_down flag is now true");

let duration = nittei_utils::config::APP_CONFIG.server_shutdown_sleep;

info!("[server] Waiting {}s before stopping", duration);

// Wait for the timeout
tokio::time::sleep(std::time::Duration::from_secs(duration)).await;

info!("[server] Stopping server...");

// Shutdown the server
server_handle.stop(true).await;

info!("[server] Server stopped");
}
}
});
}
}
15 changes: 14 additions & 1 deletion crates/api/src/status/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,21 @@ use actix_web::{web, HttpResponse};
use nittei_api_structs::get_service_health::*;
use nittei_infra::NitteiContext;

use crate::ServerSharedState;

/// Get the status of the service
async fn status(ctx: web::Data<NitteiContext>) -> HttpResponse {
async fn status(
ctx: web::Data<NitteiContext>,
shared_state: web::Data<ServerSharedState>,
) -> HttpResponse {
let is_shutting_down = shared_state.is_shutting_down.lock().await;

if *is_shutting_down {
return HttpResponse::ServiceUnavailable().json(APIResponse {
message: "Service is shutting down".into(),
});
}

match ctx.repos.status.check_connection().await {
Ok(_) => HttpResponse::Ok().json(APIResponse {
message: "Ok!\r\n".into(),
Expand Down
14 changes: 14 additions & 0 deletions crates/utils/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ pub struct AppConfig {
/// Env var: NITTEI__HTTP_PORT
pub http_port: usize,

/// The sleep time for the HTTP server shutdown (in seconds)
/// Default is 25 seconds
/// Env var: NITTEI__SERVER_SHUTDOWN_SLEEP
pub server_shutdown_sleep: u64,

/// The shutdown timeout for the HTTP server (in seconds)
/// Default is 5 seconds
/// Env var: NITTEI__SERVER_SHUTDOWN_TIMEOUT
pub server_shutdown_timeout: u64,

/// The database URL
/// Default is postgresql://postgres:postgres@localhost:45432/nittei
/// Env var: NITTEI__DATABASE_URL
Expand Down Expand Up @@ -139,6 +149,10 @@ fn parse_config() -> AppConfig {
.expect("Failed to set default host")
.set_default("http_port", "5000")
.expect("Failed to set default port")
.set_default("server_shutdown_sleep", "25")
.expect("Failed to set default server_shutdown_sleep")
.set_default("server_shutdown_timeout", "5")
.expect("Failed to set default server_shutdown_timeout")
.set_default("skip_db_migrations", false)
.expect("Failed to set default skip_db_migrations")
.set_default("enable_reminders_job", false)
Expand Down