diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index cd6635dbc4e..e75c14677b5 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -49,6 +49,7 @@ struct flb_config { double flush; /* Flush timeout */ int grace; /* Maximum grace time on shutdown */ int grace_count; /* Count of grace shutdown tries */ + int grace_input; /* Shutdown grace to keep inputs ingesting*/ flb_pipefd_t flush_fd; /* Timer FD associated to flush */ int daemon; /* Run as a daemon ? */ diff --git a/include/fluent-bit/flb_storage.h b/include/fluent-bit/flb_storage.h index 87971e92be3..1edb71cc12e 100644 --- a/include/fluent-bit/flb_storage.h +++ b/include/fluent-bit/flb_storage.h @@ -71,4 +71,6 @@ void flb_storage_input_destroy(struct flb_input_instance *in); struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx); +void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks); + #endif diff --git a/src/flb_config.c b/src/flb_config.c index 6ecfce81816..aa8ef7a4a86 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -205,8 +205,9 @@ struct flb_config *flb_config_init() config->init_time = time(NULL); config->kernel = flb_kernel_info(); config->verbose = 3; - config->grace = 5; + config->grace = 30; config->grace_count = 0; + config->grace_input = config->grace / 2; config->exit_status_code = 0; #ifdef FLB_HAVE_HTTP_SERVER diff --git a/src/flb_engine.c b/src/flb_engine.c index ceb933dfca3..ec27e5637a1 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -65,6 +65,8 @@ extern struct flb_aws_error_reporter *error_reporter; FLB_TLS_DEFINE(struct mk_event_loop, flb_engine_evl); +void flb_engine_stop_ingestion(struct flb_config *config); + void flb_engine_evl_init() { @@ -551,6 +553,9 @@ int sb_segregate_chunks(struct flb_config *config) int flb_engine_start(struct flb_config *config) { int ret; + int tasks = 0; + int fs_chunks = 0; + int mem_chunks = 0; uint64_t ts; char tmp[16]; struct flb_time t_flush; @@ -559,6 +564,7 @@ int flb_engine_start(struct flb_config *config) struct flb_bucket_queue *evl_bktq; struct flb_sched *sched; struct flb_net_dns dns_ctx; + int exiting = FLB_FALSE; /* Initialize the networking layer */ flb_net_lib_init(); @@ -765,12 +771,15 @@ int flb_engine_start(struct flb_config *config) ret = sb_segregate_chunks(config); - if (ret) + if (ret < 0) { flb_error("[engine] could not segregate backlog chunks"); return -2; } + config->grace_input = config->grace / 2; + flb_info("Shutdown Grace Period=%d, Pause Ingestion on shutdown Grace Period=%d", config->grace, config->grace_input); + while (1) { mk_event_wait(evl); /* potentially conditional mk_event_wait or mk_event_wait_2 based on bucket queue capacity for one shot events */ flb_event_priority_live_foreach(event, evl_bktq, evl, FLB_ENGINE_LOOP_MAX_ITER) { @@ -821,19 +830,48 @@ int flb_engine_start(struct flb_config *config) * resources allocated by that co-routine, the best thing is to * wait again for the grace period and re-check again. */ - ret = flb_task_running_count(config); + tasks = 0; + mem_chunks = 0; + fs_chunks = 0; + tasks = flb_task_running_count(config); + flb_chunk_count(config, &mem_chunks, &fs_chunks); + ret = tasks + mem_chunks + fs_chunks; if (ret > 0 && config->grace_count < config->grace) { if (config->grace_count == 1) { flb_task_running_print(config); + ret = sb_segregate_chunks(config); + if (ret < 0) { + flb_error("[engine] could not segregate backlog chunks"); + return -2; + } + } + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); + } + /* Create new tasks for pending chunks */ + flb_engine_flush(config, NULL); + if (config->grace_count < config->grace_input) { + if (exiting == FLB_FALSE) { + flb_engine_exit(config); + exiting = FLB_TRUE; + } + } else { + if (config->is_ingestion_active == FLB_TRUE) { + flb_engine_stop_ingestion(config); + } } - flb_engine_exit(config); } else { - if (ret > 0) { + if (tasks > 0) { flb_task_running_print(config); } + if ((mem_chunks + fs_chunks) > 0) { + flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d", + mem_chunks, fs_chunks, config->grace_count); + } flb_info("[engine] service has stopped (%i pending tasks)", - ret); + tasks); ret = config->exit_status_code; flb_engine_shutdown(config); config = NULL; @@ -915,6 +953,7 @@ int flb_engine_shutdown(struct flb_config *config) { config->is_running = FLB_FALSE; + config->is_ingestion_active = FLB_FALSE; flb_input_pause_all(config); #ifdef FLB_HAVE_STREAM_PROCESSOR @@ -959,14 +998,19 @@ int flb_engine_exit(struct flb_config *config) int ret; uint64_t val = FLB_ENGINE_EV_STOP; + val = FLB_ENGINE_EV_STOP; + ret = flb_pipe_w(config->ch_manager[1], &val, sizeof(uint64_t)); + return ret; +} + +void flb_engine_stop_ingestion(struct flb_config *config) +{ config->is_ingestion_active = FLB_FALSE; config->is_shutting_down = FLB_TRUE; - flb_input_pause_all(config); + flb_info("[engine] pausing all inputs.."); - val = FLB_ENGINE_EV_STOP; - ret = flb_pipe_w(config->ch_manager[1], &val, sizeof(uint64_t)); - return ret; + flb_input_pause_all(config); } int flb_engine_exit_status(struct flb_config *config, int status) diff --git a/src/flb_lib.c b/src/flb_lib.c index c274e574e77..9531ab5475a 100644 --- a/src/flb_lib.c +++ b/src/flb_lib.c @@ -724,7 +724,6 @@ int flb_stop(flb_ctx_t *ctx) } flb_debug("[lib] sending STOP signal to the engine"); - flb_engine_exit(ctx->config); ret = pthread_join(tid, NULL); if (ret != 0) { diff --git a/src/flb_storage.c b/src/flb_storage.c index 44d9b2e86e4..0559279c70b 100644 --- a/src/flb_storage.c +++ b/src/flb_storage.c @@ -577,6 +577,16 @@ int flb_storage_create(struct flb_config *ctx) return 0; } +void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks) +{ + struct cio_stats storage_st; + + cio_stats_get(ctx->cio, &storage_st); + + *mem_chunks = storage_st.chunks_mem; + *fs_chunks = storage_st.chunks_fs; +} + void flb_storage_destroy(struct flb_config *ctx) { struct cio_ctx *cio;