From 362aa7734ea6b3de6487b8b650933f1aa335e66b Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 30 May 2024 21:54:21 -0700 Subject: [PATCH 1/3] engine: input grace period delay shutdown for pending tasks & chunks 1. Input grace period Currently, Fluent Bit pauses all inputs 1 second after SIGTERM. The change creates an input grace period, which by default is one half the total Grace setting. This means that half way through the grace period Fluent Bit stops accepting any new logs and only sends logs pending in the buffers. 2. Check pending chunks on shutdown Previously the engine shutdown immediately if there were no pending tasks. A task is created from a chunk in the buffer. If there is a new chunk, but no task yet, the engine should keep running until the task is created and completed. This change makes the engine wait on shutdown for all pending chunks until the max grace period has expired. --- include/fluent-bit/flb_config.h | 1 + include/fluent-bit/flb_storage.h | 2 ++ src/flb_config.c | 3 +- src/flb_engine.c | 47 ++++++++++++++++++++++++++------ src/flb_lib.c | 1 - src/flb_storage.c | 10 +++++++ 6 files changed, 54 insertions(+), 10 deletions(-) diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index cd6635dbc4e..e75c14677b5 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -49,6 +49,7 @@ struct flb_config { double flush; /* Flush timeout */ int grace; /* Maximum grace time on shutdown */ int grace_count; /* Count of grace shutdown tries */ + int grace_input; /* Shutdown grace to keep inputs ingesting*/ flb_pipefd_t flush_fd; /* Timer FD associated to flush */ int daemon; /* Run as a daemon ? */ diff --git a/include/fluent-bit/flb_storage.h b/include/fluent-bit/flb_storage.h index 87971e92be3..1edb71cc12e 100644 --- a/include/fluent-bit/flb_storage.h +++ b/include/fluent-bit/flb_storage.h @@ -71,4 +71,6 @@ void flb_storage_input_destroy(struct flb_input_instance *in); struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx); +void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks); + #endif diff --git a/src/flb_config.c b/src/flb_config.c index 6ecfce81816..aa8ef7a4a86 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -205,8 +205,9 @@ struct flb_config *flb_config_init() config->init_time = time(NULL); config->kernel = flb_kernel_info(); config->verbose = 3; - config->grace = 5; + config->grace = 30; config->grace_count = 0; + config->grace_input = config->grace / 2; config->exit_status_code = 0; #ifdef FLB_HAVE_HTTP_SERVER diff --git a/src/flb_engine.c b/src/flb_engine.c index ceb933dfca3..fa7f589a3a3 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -65,6 +65,8 @@ extern struct flb_aws_error_reporter *error_reporter; FLB_TLS_DEFINE(struct mk_event_loop, flb_engine_evl); +void flb_engine_stop_ingestion(struct flb_config *config); + void flb_engine_evl_init() { @@ -551,6 +553,9 @@ int sb_segregate_chunks(struct flb_config *config) int flb_engine_start(struct flb_config *config) { int ret; + int tasks = 0; + int fs_chunks = 0; + int mem_chunks = 0; uint64_t ts; char tmp[16]; struct flb_time t_flush; @@ -771,6 +776,9 @@ int flb_engine_start(struct flb_config *config) return -2; } + config->grace_input = config->grace / 2; + flb_info("Shutdown Grace Period=%d, Pause Ingestion on shutdown Grace Period=%d", config->grace, config->grace_input); + while (1) { mk_event_wait(evl); /* potentially conditional mk_event_wait or mk_event_wait_2 based on bucket queue capacity for one shot events */ flb_event_priority_live_foreach(event, evl_bktq, evl, FLB_ENGINE_LOOP_MAX_ITER) { @@ -821,19 +829,36 @@ int flb_engine_start(struct flb_config *config) * resources allocated by that co-routine, the best thing is to * wait again for the grace period and re-check again. */ - ret = flb_task_running_count(config); + tasks = 0; + mem_chunks = 0; + fs_chunks = 0; + tasks = flb_task_running_count(config); + flb_chunk_count(config, &mem_chunks, &fs_chunks); + ret = tasks + mem_chunks + fs_chunks; if (ret > 0 && config->grace_count < config->grace) { if (config->grace_count == 1) { flb_task_running_print(config); } - flb_engine_exit(config); + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d", + mem_chunks, fs_chunks); + } + if (config->grace_count < config->grace_input) { + flb_engine_exit(config); + } else { + flb_engine_stop_ingestion(config); + } } else { - if (ret > 0) { + if (tasks > 0) { flb_task_running_print(config); } + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d", + mem_chunks, fs_chunks); + } flb_info("[engine] service has stopped (%i pending tasks)", - ret); + tasks); ret = config->exit_status_code; flb_engine_shutdown(config); config = NULL; @@ -915,6 +940,7 @@ int flb_engine_shutdown(struct flb_config *config) { config->is_running = FLB_FALSE; + config->is_ingestion_active = FLB_FALSE; flb_input_pause_all(config); #ifdef FLB_HAVE_STREAM_PROCESSOR @@ -959,14 +985,19 @@ int flb_engine_exit(struct flb_config *config) int ret; uint64_t val = FLB_ENGINE_EV_STOP; + val = FLB_ENGINE_EV_STOP; + ret = flb_pipe_w(config->ch_manager[1], &val, sizeof(uint64_t)); + return ret; +} + +void flb_engine_stop_ingestion(struct flb_config *config) +{ config->is_ingestion_active = FLB_FALSE; config->is_shutting_down = FLB_TRUE; - flb_input_pause_all(config); + flb_info("[engine] pausing all inputs.."); - val = FLB_ENGINE_EV_STOP; - ret = flb_pipe_w(config->ch_manager[1], &val, sizeof(uint64_t)); - return ret; + flb_input_pause_all(config); } int flb_engine_exit_status(struct flb_config *config, int status) diff --git a/src/flb_lib.c b/src/flb_lib.c index c274e574e77..9531ab5475a 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -724,7 +724,6 @@ int flb_stop(flb_ctx_t *ctx) } flb_debug("[lib] sending STOP signal to the engine"); - flb_engine_exit(ctx->config); ret = pthread_join(tid, NULL); if (ret != 0) { diff --git a/src/flb_storage.c b/src/flb_storage.c index 44d9b2e86e4..0559279c70b 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -577,6 +577,16 @@ int flb_storage_create(struct flb_config *ctx) return 0; } +void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks) +{ + struct cio_stats storage_st; + + cio_stats_get(ctx->cio, &storage_st); + + *mem_chunks = storage_st.chunks_mem; + *fs_chunks = storage_st.chunks_fs; +} + void flb_storage_destroy(struct flb_config *ctx) { struct cio_ctx *cio; From 79445f79438fd8810a00c2718d73544a04c4d3fa Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Wed, 12 Jun 2024 21:01:09 -0700 Subject: [PATCH 2/3] engine: force flush on shutdown to create tasks for pending chunks Signed-off-by: Wesley Pettit --- src/flb_engine.c | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/flb_engine.c b/src/flb_engine.c index fa7f589a3a3..15a27bcc069 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -564,6 +564,7 @@ int flb_engine_start(struct flb_config *config) struct flb_bucket_queue *evl_bktq; struct flb_sched *sched; struct flb_net_dns dns_ctx; + int exiting = FLB_FALSE; /* Initialize the networking layer */ flb_net_lib_init(); @@ -840,13 +841,20 @@ int flb_engine_start(struct flb_config *config) flb_task_running_print(config); } if ((mem_chunks + fs_chunks) > 0) { - flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d", - mem_chunks, fs_chunks); + flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); } + /* Create new tasks for pending chunks */ + flb_engine_flush(config, NULL); if (config->grace_count < config->grace_input) { - flb_engine_exit(config); + if (exiting == FLB_FALSE) { + flb_engine_exit(config); + exiting = FLB_TRUE; + } } else { - flb_engine_stop_ingestion(config); + if (config->is_ingestion_active == FLB_TRUE) { + flb_engine_stop_ingestion(config); + } } } else { @@ -854,8 +862,8 @@ int flb_engine_start(struct flb_config *config) flb_task_running_print(config); } if ((mem_chunks + fs_chunks) > 0) { - flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d", - mem_chunks, fs_chunks); + flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); } flb_info("[engine] service has stopped (%i pending tasks)", tasks); From db7cf76e265daae24b44f98d1f4ff6b0549a5b6f Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 16 Jun 2024 19:03:31 -0700 Subject: [PATCH 3/3] engine: send backlog chunks on shutdown Signed-off-by: Wesley Pettit --- src/flb_engine.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/flb_engine.c b/src/flb_engine.c index 15a27bcc069..ec27e5637a1 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -771,7 +771,7 @@ int flb_engine_start(struct flb_config *config) ret = sb_segregate_chunks(config); - if (ret) + if (ret < 0) { flb_error("[engine] could not segregate backlog chunks"); return -2; @@ -839,6 +839,11 @@ int flb_engine_start(struct flb_config *config) if (ret > 0 && config->grace_count < config->grace) { if (config->grace_count == 1) { flb_task_running_print(config); + ret = sb_segregate_chunks(config); + if (ret < 0) { + flb_error("[engine] could not segregate backlog chunks"); + return -2; + } } if ((mem_chunks + fs_chunks) > 0) { flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",