diff --git a/include/fluent-bit/flb_output.h b/include/fluent-bit/flb_output.h index c9172b65d17..82022d9c4e6 100644 --- a/include/fluent-bit/flb_output.h +++ b/include/fluent-bit/flb_output.h @@ -408,6 +408,10 @@ struct flb_output_instance { struct mk_list flush_list; struct mk_list flush_list_destroy; + /* similar to flush coroutine list above, timer coroutine list */ + struct mk_list timer_coro_list; + struct mk_list timer_coro_list_destroy; + /* Keep a reference to the original context this instance belongs to */ struct flb_config *config; }; @@ -428,6 +432,28 @@ struct flb_output_flush { struct mk_list _head; /* Link to flb_task->threads */ }; +/* + * stores timer coros on the timer_coro_list, if the output uses them + */ +struct flb_output_timer_coro { + struct flb_config *config; /* FLB context */ + struct flb_output_instance *o_ins; /* output instance */ + struct flb_output_coro_timer_data *timer_data; /* callback info */ + struct flb_coro *coro; /* parent coro addr */ + struct mk_list _head; /* Link to timer_coro_list */ +}; + +/* + * If the output uses timer coros, then this is used as the callback data + * passed to flb_sched_timer_cb_create + */ +struct flb_output_coro_timer_data { + struct flb_output_instance *ins; /* associate coro with this output instance */ + flb_sds_t job_name; /* used on engine shutdown, print pending "custom" jobs */ + void (*cb) (struct flb_config *config, void *data); /* call this output callback in the coro */ + void *data; /* opaque data to pass to the above cb */ +}; + static FLB_INLINE int flb_output_is_threaded(struct flb_output_instance *ins) { return ins->is_threaded; @@ -443,6 +469,19 @@ static FLB_INLINE void flb_output_flush_destroy(struct flb_output_flush *out_flu flb_free(out_flush); } +/* + * See below note for flb_out_flush_params + * this is equivalent for timer coroutines + */ +struct flb_out_timer_coro_params { + struct flb_output_timer_coro *output_timer; /* output flush */ + struct flb_output_coro_timer_data *timer_data; /* callback info */ + struct flb_config *config; /* Fluent Bit context */ + struct flb_coro *coro; /* coroutine context */ +}; + +extern FLB_TLS_DEFINE(struct flb_out_timer_coro_params, timer_coro_params); + /* * libco do not support parameters in the entrypoint function due to the * complexity of implementation in terms of architecture and compiler, but @@ -526,6 +565,135 @@ static FLB_INLINE void output_pre_cb_flush(void) persisted_params.config); } +/* same as above but for timer coros */ +static FLB_INLINE void output_pre_timer_cb(void) +{ + struct flb_coro *coro; + struct flb_out_timer_coro_params *params; + struct flb_out_timer_coro_params persisted_params; + struct flb_output_coro_timer_data *timer_data; + struct flb_output_instance *o_ins; + struct flb_out_thread_instance *th_ins; + struct flb_output_timer_coro *timer_coro; + + + params = (struct flb_out_timer_coro_params *) FLB_TLS_GET(timer_coro_params); + if (!params) { + flb_error("[output] no timer coro params defined, unexpected"); + return; + } + + /* + * flush coros are actually started in engine after they are + * written down a pipe. This seems unnecessary here. So we can start it right away. + * If this works, I can remove the persisted params. + */ + coro = params->coro; + persisted_params = *params; + timer_coro = params->output_timer; + o_ins = params->output_timer->o_ins; + // co_switch(coro->caller); + + timer_data = persisted_params.timer_data; + timer_data->cb(persisted_params.config, timer_data->data); + // after output callback is done, just do clean up here? + // similar to: flb_output_flush_prepare_destroy + /* Move timer coroutine context from active list to the destroy one */ + if (flb_output_is_threaded(o_ins) == FLB_TRUE) { + th_ins = flb_output_thread_instance_get(); + pthread_mutex_lock(&th_ins->timer_mutex); + mk_list_del(&timer_coro->_head); + mk_list_add(&timer_coro->_head, &th_ins->timer_coro_list_destroy); + pthread_mutex_unlock(&th_ins->timer_mutex); + } + else { + mk_list_del(&timer_coro->_head); + mk_list_add(&timer_coro->_head, &o_ins->timer_coro_list_destroy); + } + + /* yield back to caller/control code */ + flb_coro_yield(coro, FLB_TRUE); +} + +/* + * If the output uses scheduled timers with coroutines, + * this function is used as the callback for flb_sched_timer_cb_create + */ +static FLB_INLINE +void flb_output_coro_timer_cb(struct flb_config *config, void *data) +{ + size_t stack_size; + struct flb_coro *coro; + struct flb_output_timer_coro *timer_coro; + struct flb_out_thread_instance *th_ins; + struct flb_output_coro_timer_data *ctx = (struct flb_output_coro_timer_data *) data; + struct flb_out_timer_coro_params *params; + struct flb_output_instance *o_ins; + + /* Custom output coroutine info */ + timer_coro = (struct flb_output_timer_coro *) flb_calloc(1, sizeof(struct flb_output_timer_coro)); + if (!timer_coro) { + flb_errno(); + return; + } + + /* Create a new co-routine */ + coro = flb_coro_create(timer_coro); + if (!coro) { + flb_free(timer_coro); + return; + } + + o_ins = ctx->ins; + timer_coro->o_ins = o_ins; + timer_coro->config = config; + timer_coro->coro = coro; + + coro->caller = co_active(); + coro->callee = co_create(config->coro_stack_size, + output_pre_timer_cb, &stack_size); + + if (coro->callee == NULL) { + flb_coro_destroy(coro); + flb_free(timer_coro); + return; + } + +#ifdef FLB_HAVE_VALGRIND + coro->valgrind_stack_id = \ + VALGRIND_STACK_REGISTER(coro->callee, ((char *) coro->callee) + stack_size); +#endif + + if (o_ins->is_threaded == FLB_TRUE) { + th_ins = flb_output_thread_instance_get(); + pthread_mutex_lock(&th_ins->timer_mutex); + mk_list_add(&timer_coro->_head, &th_ins->timer_coro_list); + pthread_mutex_unlock(&th_ins->timer_mutex); + } + else { + mk_list_add(&timer_coro->_head, &o_ins->timer_coro_list); + } + + params = (struct flb_out_timer_coro_params *) FLB_TLS_GET(timer_coro_params); + if (!params) { + params = (struct flb_out_timer_coro_params *) flb_calloc(1, sizeof(struct flb_out_flush_params)); + if (!params) { + flb_errno(); + return; + } + } + + /* Callback parameters in order */ + params->output_timer = timer_coro; + params->timer_data = ctx; + params->config = config; + params->coro = coro; + + FLB_TLS_SET(timer_coro_params, params); + co_switch(coro->callee); + return; +} + void flb_output_flush_prepare_destroy(struct flb_output_flush *out_flush); int flb_output_flush_id_get(struct flb_output_instance *ins); @@ -659,7 +827,11 @@ static inline void flb_output_return(int ret, struct flb_coro *co) { flb_output_flush_prepare_destroy(out_flush); } -/* return the number of co-routines running in the instance */ +/* + * return the number of flush co-routines running in the instance + * Currently, this function is only used for FLB_OUTPUT_NO_MULTIPLEX + * and does not count timer_coros, used by S3 output + */ static inline int flb_output_coros_size(struct flb_output_instance *ins) { int size = 0; @@ -678,6 +850,55 @@ static inline int flb_output_coros_size(struct flb_output_instance *ins) return size; } +/* Used in engine flb_running_count */ +static inline int flb_output_timer_coros_size(struct flb_output_instance *ins) +{ + int size = 0; + + if (flb_output_is_threaded(ins) == FLB_TRUE) { + /* + * On threaded mode, we need to count the active co-routines of + * every running thread of the thread pool. + */ + size = flb_output_thread_pool_timer_coros_size(ins); + } + else { + size = mk_list_size(&ins->timer_coro_list); + } + + return size; +} + +static inline void flb_timer_coros_print(struct mk_list *timer_coro_list) +{ + struct flb_output_timer_coro *timer_coro; + struct mk_list *tmp; + struct mk_list *head; + int n = mk_list_size(timer_coro_list); + if (n != 0) { + /* get one coro for the job_name */ + mk_list_foreach_safe(head, tmp, timer_coro_list) { + timer_coro = mk_list_entry(head, struct flb_output_timer_coro, _head); + if (timer_coro != NULL) { + flb_info("[task] output=%s still running %d %s(s)", + timer_coro->o_ins->alias, n, timer_coro->timer_data->job_name); + break; + } + } + } +} + +/* Used in engine flb_running_print */ +static inline void flb_output_timer_coros_print(struct flb_output_instance *ins) +{ + if (flb_output_is_threaded(ins) == FLB_TRUE) { + flb_output_thread_pool_timer_coros_print(ins); + } + else { + flb_timer_coros_print(&ins->timer_coro_list); + } +} + static inline void flb_output_return_do(int x) { struct flb_coro *coro; diff --git a/include/fluent-bit/flb_output_thread.h b/include/fluent-bit/flb_output_thread.h index 4100c9c1a7e..38a59276025 100644 --- a/include/fluent-bit/flb_output_thread.h +++ b/include/fluent-bit/flb_output_thread.h @@ -86,7 +86,12 @@ struct flb_out_thread_instance { * 'flushes' running by a threaded instance, then the access to the 'flush_list' * must be protected: we use 'flush_mutex for that purpose. */ - pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */ + pthread_mutex_t flush_mutex; /* mutex for 'flush_list' */ + + /* Same as flush_mutex but for timer coros */ + struct mk_list timer_coro_list; /* flush context list */ + struct mk_list timer_coro_list_destroy; /* flust context destroy list */ + pthread_mutex_t timer_mutex; /* mutex for 'flush_list' */ /* List of mapped 'upstream' contexts */ struct mk_list upstreams; @@ -100,7 +105,8 @@ int flb_output_thread_pool_start(struct flb_output_instance *ins); int flb_output_thread_pool_flush(struct flb_task *task, struct flb_output_instance *out_ins, struct flb_config *config); - +int flb_output_thread_pool_timer_coros_size(struct flb_output_instance *ins); +void flb_output_thread_pool_timer_coros_print(struct flb_output_instance *ins); void flb_output_thread_instance_init(); struct flb_out_thread_instance *flb_output_thread_instance_get(); diff --git a/src/flb_engine.c b/src/flb_engine.c index 20548798bb4..64d76a166a0 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -385,6 +385,37 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts, return 0; } +static int flb_running_count(struct flb_config *config) +{ + int tasks, timers, n = 0; + struct mk_list *head; + struct mk_list *tmp; + struct flb_output_instance *o_ins; + + mk_list_foreach_safe(head, tmp, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + n = flb_output_timer_coros_size(o_ins); + timers += n; + } + + tasks = flb_task_running_count(config); + return tasks + timers; +} + +static void flb_running_print(struct flb_config *config) +{ + struct mk_list *head ; + struct mk_list *tmp; + struct flb_output_instance *o_ins; + + flb_task_running_print(config); + + mk_list_foreach_safe(head, tmp, &config->outputs) { + o_ins = mk_list_entry(head, struct flb_output_instance, _head); + flb_output_timer_coros_print(o_ins); + } +} + static inline int flb_engine_manager(flb_pipefd_t fd, struct flb_config *config) { int bytes; @@ -543,6 +574,7 @@ int sb_segregate_chunks(struct flb_config *config) int flb_engine_start(struct flb_config *config) { int ret; + int count; uint64_t ts; char tmp[16]; struct flb_time t_flush; @@ -813,19 +845,19 @@ int flb_engine_start(struct flb_config *config) * resources allocated by that co-routine, the best thing is to * wait again for the grace period and re-check again. */ - ret = flb_task_running_count(config); - if (ret > 0 && config->grace_count < config->grace) { + count = flb_running_count(config); + if (count > 0 && config->grace_count < config->grace) { if (config->grace_count == 1) { - flb_task_running_print(config); + flb_running_print(config); } flb_engine_exit(config); } else { - if (ret > 0) { - flb_task_running_print(config); + if (count > 0) { + flb_running_print(config); } flb_info("[engine] service has stopped (%i pending tasks)", - ret); + count); ret = config->exit_status_code; flb_engine_shutdown(config); config = NULL; diff --git a/src/flb_output.c b/src/flb_output.c index 07be1bf3314..2ba9c9a0af9 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -40,10 +40,12 @@ #include FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params); +FLB_TLS_DEFINE(struct flb_out_timer_coro_params, timer_coro_params); void flb_output_prepare() { FLB_TLS_INIT(out_flush_params); + FLB_TLS_INIT(timer_coro_params); } /* Validate the the output address protocol */ @@ -478,6 +480,11 @@ void flb_output_exit(struct flb_config *config) if (params) { flb_free(params); } + params = FLB_TLS_GET(timer_coro_params); + if (params) { + flb_free(params); + } + } static inline int instance_id(struct flb_config *config) diff --git a/src/flb_output_thread.c b/src/flb_output_thread.c index 1198156321e..965ac8f8d6a 100644 --- a/src/flb_output_thread.c +++ b/src/flb_output_thread.c @@ -180,7 +180,8 @@ static void output_thread(void *data) struct flb_output_instance *ins; struct flb_output_flush *out_flush; struct flb_out_thread_instance *th_ins = data; - struct flb_out_flush_params *params; + struct flb_out_flush_params *flush_params = NULL; + struct flb_out_timer_coro_params *timer_params = NULL; struct flb_net_dns dns_ctx; /* Register thread instance */ @@ -330,7 +331,7 @@ static void output_thread(void *data) flb_sched_timer_cleanup(sched); /* Check if we should stop the event loop */ - if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0) { + if (stopping == FLB_TRUE && mk_list_size(&th_ins->flush_list) == 0 && mk_list_size(&th_ins->timer_coro_list) == 0) { /* * If there are no busy network connections (and no coroutines) its * safe to stop it. @@ -356,9 +357,13 @@ static void output_thread(void *data) flb_upstream_conn_pending_destroy_list(&th_ins->upstreams); flb_sched_destroy(sched); - params = FLB_TLS_GET(out_flush_params); - if (params) { - flb_free(params); + flush_params = FLB_TLS_GET(out_flush_params); + if (flush_params) { + flb_free(flush_params); + } + timer_params = FLB_TLS_GET(timer_coro_params); + if (timer_params) { + flb_free(timer_params); } mk_event_loop_destroy(th_ins->evl); flb_bucket_queue_destroy(th_ins->evl_bktq); @@ -434,7 +439,10 @@ int flb_output_thread_pool_create(struct flb_config *config, th_ins->flush_id = 0; mk_list_init(&th_ins->flush_list); mk_list_init(&th_ins->flush_list_destroy); + mk_list_init(&th_ins->timer_coro_list); + mk_list_init(&th_ins->timer_coro_list_destroy); pthread_mutex_init(&th_ins->flush_mutex, NULL); + pthread_mutex_init(&th_ins->timer_mutex, NULL); mk_list_init(&th_ins->upstreams); upstream_thread_create(th_ins, ins); @@ -492,6 +500,54 @@ int flb_output_thread_pool_create(struct flb_config *config, return 0; } +int flb_output_thread_pool_timer_coros_size(struct flb_output_instance *ins) +{ + int n; + int size = 0; + struct mk_list *head; + struct flb_tp *tp = ins->tp; + struct flb_tp_thread *th; + struct flb_out_thread_instance *th_ins; + + mk_list_foreach(head, &tp->list_threads) { + th = mk_list_entry(head, struct flb_tp_thread, _head); + if (th->status != FLB_THREAD_POOL_RUNNING) { + continue; + } + + th_ins = th->params.data; + + pthread_mutex_lock(&th_ins->flush_mutex); + n = mk_list_size(&th_ins->timer_coro_list); + pthread_mutex_unlock(&th_ins->flush_mutex); + + size += n; + } + + return size; +} + +void flb_output_thread_pool_timer_coros_print(struct flb_output_instance *ins) +{ + struct mk_list *head; + struct mk_list *tmp; + struct flb_tp *tp = ins->tp; + struct flb_tp_thread *th; + struct flb_out_thread_instance *th_ins; + + mk_list_foreach_safe(head, tmp, &tp->list_threads) { + th = mk_list_entry(head, struct flb_tp_thread, _head); + if (th->status != FLB_THREAD_POOL_RUNNING) { + continue; + } + + th_ins = th->params.data; + pthread_mutex_lock(&th_ins->timer_mutex); + flb_timer_coros_print(&th_ins->timer_coro_list); + pthread_mutex_unlock(&th_ins->timer_mutex); + } +} + int flb_output_thread_pool_coros_size(struct flb_output_instance *ins) { int n; @@ -501,7 +557,6 @@ int flb_output_thread_pool_coros_size(struct flb_output_instance *ins) struct flb_tp_thread *th; struct flb_out_thread_instance *th_ins; - /* Signal each worker thread that needs to stop doing work */ mk_list_foreach(head, &tp->list_threads) { th = mk_list_entry(head, struct flb_tp_thread, _head); if (th->status != FLB_THREAD_POOL_RUNNING) {