Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause input ingestion halfway through Grace period #33

Open
wants to merge 3 commits into
base: 2_33_awsbase
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/flb_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct flb_config {
double flush; /* Flush timeout */
int grace; /* Maximum grace time on shutdown */
int grace_count; /* Count of grace shutdown tries */
int grace_input; /* Shutdown grace to keep inputs ingesting*/
flb_pipefd_t flush_fd; /* Timer FD associated to flush */

int daemon; /* Run as a daemon ? */
Expand Down
2 changes: 2 additions & 0 deletions include/fluent-bit/flb_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ void flb_storage_input_destroy(struct flb_input_instance *in);

struct flb_storage_metrics *flb_storage_metrics_create(struct flb_config *ctx);

void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks);

#endif
3 changes: 2 additions & 1 deletion src/flb_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ struct flb_config *flb_config_init()
config->init_time = time(NULL);
config->kernel = flb_kernel_info();
config->verbose = 3;
config->grace = 5;
config->grace = 30;
config->grace_count = 0;
config->grace_input = config->grace / 2;
config->exit_status_code = 0;

#ifdef FLB_HAVE_HTTP_SERVER
Expand Down
62 changes: 53 additions & 9 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ extern struct flb_aws_error_reporter *error_reporter;

FLB_TLS_DEFINE(struct mk_event_loop, flb_engine_evl);

void flb_engine_stop_ingestion(struct flb_config *config);


void flb_engine_evl_init()
{
Expand Down Expand Up @@ -551,6 +553,9 @@ int sb_segregate_chunks(struct flb_config *config)
int flb_engine_start(struct flb_config *config)
{
int ret;
int tasks = 0;
int fs_chunks = 0;
int mem_chunks = 0;
uint64_t ts;
char tmp[16];
struct flb_time t_flush;
Expand All @@ -559,6 +564,7 @@ int flb_engine_start(struct flb_config *config)
struct flb_bucket_queue *evl_bktq;
struct flb_sched *sched;
struct flb_net_dns dns_ctx;
int exiting = FLB_FALSE;

/* Initialize the networking layer */
flb_net_lib_init();
Expand Down Expand Up @@ -765,12 +771,15 @@ int flb_engine_start(struct flb_config *config)

ret = sb_segregate_chunks(config);

if (ret)
if (ret < 0)
{
flb_error("[engine] could not segregate backlog chunks");
return -2;
}

config->grace_input = config->grace / 2;
flb_info("Shutdown Grace Period=%d, Pause Ingestion on shutdown Grace Period=%d", config->grace, config->grace_input);

while (1) {
mk_event_wait(evl); /* potentially conditional mk_event_wait or mk_event_wait_2 based on bucket queue capacity for one shot events */
flb_event_priority_live_foreach(event, evl_bktq, evl, FLB_ENGINE_LOOP_MAX_ITER) {
Expand Down Expand Up @@ -821,19 +830,48 @@ int flb_engine_start(struct flb_config *config)
* resources allocated by that co-routine, the best thing is to
* wait again for the grace period and re-check again.
*/
ret = flb_task_running_count(config);
tasks = 0;
mem_chunks = 0;
fs_chunks = 0;
tasks = flb_task_running_count(config);
flb_chunk_count(config, &mem_chunks, &fs_chunks);
ret = tasks + mem_chunks + fs_chunks;
if (ret > 0 && config->grace_count < config->grace) {
if (config->grace_count == 1) {
flb_task_running_print(config);
ret = sb_segregate_chunks(config);
if (ret < 0) {
flb_error("[engine] could not segregate backlog chunks");
return -2;
}
}
if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] Pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",
mem_chunks, fs_chunks, config->grace_count);
}
/* Create new tasks for pending chunks */
flb_engine_flush(config, NULL);
if (config->grace_count < config->grace_input) {
if (exiting == FLB_FALSE) {
flb_engine_exit(config);
exiting = FLB_TRUE;
}
} else {
if (config->is_ingestion_active == FLB_TRUE) {
flb_engine_stop_ingestion(config);
}
}
flb_engine_exit(config);
}
else {
if (ret > 0) {
if (tasks > 0) {
flb_task_running_print(config);
}
if ((mem_chunks + fs_chunks) > 0) {
flb_info("[engine] pending chunk count: memory=%d, filesystem=%d; grace_timer=%d",
mem_chunks, fs_chunks, config->grace_count);
}
flb_info("[engine] service has stopped (%i pending tasks)",
ret);
tasks);
ret = config->exit_status_code;
flb_engine_shutdown(config);
config = NULL;
Expand Down Expand Up @@ -915,6 +953,7 @@ int flb_engine_shutdown(struct flb_config *config)
{

config->is_running = FLB_FALSE;
config->is_ingestion_active = FLB_FALSE;
flb_input_pause_all(config);

#ifdef FLB_HAVE_STREAM_PROCESSOR
Expand Down Expand Up @@ -959,14 +998,19 @@ int flb_engine_exit(struct flb_config *config)
int ret;
uint64_t val = FLB_ENGINE_EV_STOP;

val = FLB_ENGINE_EV_STOP;
ret = flb_pipe_w(config->ch_manager[1], &val, sizeof(uint64_t));
return ret;
}

void flb_engine_stop_ingestion(struct flb_config *config)
{
config->is_ingestion_active = FLB_FALSE;
config->is_shutting_down = FLB_TRUE;

flb_input_pause_all(config);
flb_info("[engine] pausing all inputs..");

val = FLB_ENGINE_EV_STOP;
ret = flb_pipe_w(config->ch_manager[1], &val, sizeof(uint64_t));
return ret;
flb_input_pause_all(config);
}

int flb_engine_exit_status(struct flb_config *config, int status)
Expand Down
1 change: 0 additions & 1 deletion src/flb_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ int flb_stop(flb_ctx_t *ctx)
}

flb_debug("[lib] sending STOP signal to the engine");

flb_engine_exit(ctx->config);
ret = pthread_join(tid, NULL);
if (ret != 0) {
Expand Down
10 changes: 10 additions & 0 deletions src/flb_storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,16 @@ int flb_storage_create(struct flb_config *ctx)
return 0;
}

void flb_chunk_count(struct flb_config *ctx, int *mem_chunks, int *fs_chunks)
{
struct cio_stats storage_st;

cio_stats_get(ctx->cio, &storage_st);

*mem_chunks = storage_st.chunks_mem;
*fs_chunks = storage_st.chunks_fs;
}

void flb_storage_destroy(struct flb_config *ctx)
{
struct cio_ctx *cio;
Expand Down
Loading