-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fluent Bit Scheduled Timer with output coroutine design #7466
Comments
@edsiper here is the issue and design as you requested/based on our discussion. Let me know what you think. @leonardo-albertovich FYI... previously we had a design to migrate S3 to async networking that didn't require as many changes, it involved capturing one flush coroutine and turning it into a long lived "daemon coroutine", however, I implemented that and ran into issues. You can see my work on that here:
Specifically, the way the daemon coroutine worked is it used |
We have been talking about a similar functionality internally but didn't get to this stage yet so it's great that you did. At first glance the first option seems preferable but I think we should ensure that the design is generic with homogeneous "entry points" for all four types of plugin (input, output, filter and processor) so we don't end up replicating more code or having to modify the interface soon. The particular use cases for this we have are filters that perform aggregation or grouping such as multiline, extensibility filters such as lua or wasm and in the future probably processors that do aggregation. As for that first attempt you made, I agree that while nifty it's not ideal and with a rather voluminous plugin such as s3 simplicity should be favored. |
@leonardo-albertovich So how this would work is that each plugin time would need a callback function and data struct for its type. Each would follow the pattern that I have here for outputs. So you generally agree with the design? Do you have any comment on the other "For discussion" bits? (They are bolded or you can search for them). |
No, sorry if that was confusing, maybe I didn't express it clearly but I meant to say that having a timer callback as part of the standard plugin callbacks would end up being limiting. I agree that it would be nice if plugins could schedule coroutine based timers. As for the question about the id, I'd have to go through the code to get that context back to be able to answer, I'll take a look at it tomorrow. |
thank you, I am good with the approach proposed, we need async timers with co-routines involved. btw, thanks for taking the time to document the problem, use cases and approaches. |
@leonardo-albertovich there's one more important key bit to look into:
I think I don't need to update this... or if I do it should be to change this code to be |
This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the |
This issue was closed because it has been stalled for 5 days with no activity. |
Fluent Bit Scheduled Timer with coroutine design
Background
Coroutines
Currently, Fluent Bit use a coroutines for concurrency, see the description here: https://github.com/fluent/fluent-bit/blob/master/DEVELOPER_GUIDE.md#concurrency
Currently, the code internally only supports coroutines for output flush tasks- jobs for flushing data to a destination via an output plugin. Each coroutine flushes a single chunk (~2 MB of data) via a single output plugin.
Async vs Sync network code
Fluent Bit has two networking stacks in its core code, sync and async.
The async stack is actively maintained, given bug fixes, and is used by all output plugins except for the S3 output. It is also more performant because it uses coroutines. This means that when one job is waiting on network IO, others can run.
The sync networking stack is not as actively maintained, and S3 is the only output plugin that uses it. AWS Distro for Fluent Bit team has seen issues with the sync networking stack, including the high impact “CloudWatch output hang issue”. The sync stack is also wasteful as it blocks an entire thread for pending network IO.
The async stack requires the use of coroutines.
S3 output plugin unique use case
The S3 plugin generally does not upload data via the output coroutines. This is because customer desire large log files in S3, greater than the 2MB chunk size. Instead the coroutines simply buffer data on the filesystem to create large files.
Please see full explanation of S3 uniqueness here: https://docs.fluentbit.io/manual/pipeline/outputs/s3#differences-between-s3-and-other-fluent-bit-outputs
Therefore, S3 uses a “timer callback” which is invoked periodically to check for ready or timed out files and upload them to S3. The timer callback is invoked at a set frequency by the Fluent Bit engine. However, since timers do not have coroutines currently, the timer callback runs via the engine directly and does not run in a coroutine. This means it must use the sync networking stack.
Given the past issues with sync networking in outputs, and the AWS Distro team would like to migrate the S3 output to async networking.
Problem Statement and Goal
To migrate to the async networking stack for S3 output file uploads, the timer callback must run a coroutine.
Therefore, this document outlines a proposal to implement a scheduled timer that runs its callback in a coroutine.
Design
Creating output timer coroutines
[recommend] Option 1: re-use existing
flb_sched_timer_cb_create
The existing
flb_sched_timer_cb_create
code can be re-used without modification: https://github.com/fluent/fluent-bit/blob/1.9/src/flb_scheduler.c#L435This function tells the event loop to invoke a callback at a set frequency.
Currently, the function accepts a callback function pointer and a context pointer.
We will create a new function in
flb_output.h
that creates and invokes coroutines for timers.This function will be passed as the callback function pointer in
flb_sched_timer_cb_create
.The data pointer passed to this callback must allow it to invoke a callback function in the output plugin and pass it some data. The
flb_output_coro_timer_cb_create
also needs toflb_output_instance
to associate the coroutine with an output instance. Thus, we will create a new structure:[Not recommended] Option 2: New
flb_sched_timer_output_coro_create
An alternate option would be creating a new timer create function in
flb_scheduler.c
.This function would need to call out to some function in
flb_output.h
that would create a coroutine and invoke a callback. This output function would be set as the timer event callback, and it would then call the passed in callback. That function would need to be similar to or the same as the option 1 function inflb_output.h
. It would need a custom struct to hold the output instance, passed in callback and callback data.This option is just as complicated as the above, simply makes things more complicated by adding additional code changes in
flb_scheduler.c
. Therefore, it is not recommended.Yielding and resuming output timer coroutines
Once the coroutine is created, its execution is no different than other/existing coroutines. It can use the async network stack and yield and resume on network IO events the same as existing flush coroutines.
Tracking output timer coroutines
Currently, flush coroutines are tracked via a
flush_list
. This list exists either on theflb_output_instance
in the zero worker case, or on theflb_thread_instance
for worker mode.There are two reasons why coroutines must be tracked:
For flush coroutines, currently the following lists exist. The same design will be followed for output scheduled timer coroutines:
flush_list
andflush_list_destroy
on eachflb_output_instance
. For output timer coroutines, we can call ittimer_coro_list
andtimer_coro_list_destroy
.flb_thread_instance
.Currently, the structure stored on the list is a
flb_output_flush
, which can hold necessary metadata about the coroutine. We can create a similar structure:For discussion: Unlike the
flb_output_flush
this structure lacks an ID. I was not able to discern the purpose of the ID in the current code for flush coroutines. Do we need an ID?Cleaning up output timer coroutines
When a coroutine is complete, it needs to be added to the destroy list and then yield back to the engine.
For flush coroutines, the output is required to call the
FLB_OUTPUT_RETURN
macro which adds the coro to destroy list and yields.For output timer coroutines, we could follow a similar model, and have a
FLB_OUTPUT_TIMER_RETURN
. The callback passed by the output plugin would be required to call this when complete. This macro will add the timer coroutine to the destroy list.For discussion: Is there a simpler way to do this without a new macro?
Currently in the event loop (both engine and worker) there are clean up functions that run at the end of each iteration. Similar for output timer coroutines we would need to add a new clean up function that checks the
timer_coro_list_destroy
for coros that are ready to be destroyed.Output timer coroutines and graceful shutdown
Tracking active coroutines is necessary for graceful shutdown. This ensures that pending uploads can complete when the engine receives a
SIGTERM
. Currently, active coroutines interact with graceful shutdown in the following places:[flb_output_thread.c](https://github.com/fluent/fluent-bit/blob/master/src/flb_output_thread.c#L336)
: Each worker thread waits for its active flush coroutines to complete by checking theflush_list
length on shutdown.flb_engine.c
: The engine callsflb_task_running_count
to determine if any tasks are pending. This is essentially a proxy for active and pending flushes (which use flush coroutines). It also callsflb_task_running_print
to show the user that the graceful shutdown is working on finishing pending tasks.1. Worker change: checking output timer coroutines on shutdown
On shutdown, the worker must check the length of both the
flush_list
and thetimer_coro_list
.2. Engine change: tracking and printing running tasks and output coroutines
The engine graceful shutdown should wait for scheduled timer coroutines to complete. In addition, the timer coroutines created by S3 count as in-progress work for an output, same as tasks. Therefore, we must make changes to this code to support the new coroutine type.
Therefore, in
flb_engine.c
we will create new functions that wrap the existing and new use cases:flb_running_count
: count of running tasks + output timer coroutinesflb_running_print
: print running tasks + output timer coroutinesFor the print function, the format of the output will be:
For example:
Questions for comment:
flb_engine.c
the most ideal place for this new code?upload
so that the message says:[output] s3.1 has 1 pending upload(s)
Other changes
Currently there is a function
flb_output_coros_size
. This function seems to only be used for one purpose, for theFLB_OUTPUT_NO_MULTIPLEX
function to ensure that only one flush runs at a time: https://github.com/fluent/fluent-bit/blob/v2.1.3/src/flb_engine_dispatch.c#L193Consequently, it would appear that we do not need to add timer coroutine count to this function.
For Discussion: Is this the right long term decision?
The text was updated successfully, but these errors were encountered: