Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fluent Bit Scheduled Timer with output coroutine design #7466

Closed
PettitWesley opened this issue May 24, 2023 · 8 comments
Closed

Fluent Bit Scheduled Timer with output coroutine design #7466

PettitWesley opened this issue May 24, 2023 · 8 comments
Labels

Comments

@PettitWesley
Copy link
Contributor

PettitWesley commented May 24, 2023

Fluent Bit Scheduled Timer with coroutine design

Background

Coroutines

Currently, Fluent Bit use a coroutines for concurrency, see the description here: https://github.com/fluent/fluent-bit/blob/master/DEVELOPER_GUIDE.md#concurrency

Currently, the code internally only supports coroutines for output flush tasks- jobs for flushing data to a destination via an output plugin. Each coroutine flushes a single chunk (~2 MB of data) via a single output plugin.

Async vs Sync network code

Fluent Bit has two networking stacks in its core code, sync and async.

The async stack is actively maintained, given bug fixes, and is used by all output plugins except for the S3 output. It is also more performant because it uses coroutines. This means that when one job is waiting on network IO, others can run.

The sync networking stack is not as actively maintained, and S3 is the only output plugin that uses it. AWS Distro for Fluent Bit team has seen issues with the sync networking stack, including the high impact “CloudWatch output hang issue”. The sync stack is also wasteful as it blocks an entire thread for pending network IO.

The async stack requires the use of coroutines.

S3 output plugin unique use case

The S3 plugin generally does not upload data via the output coroutines. This is because customer desire large log files in S3, greater than the 2MB chunk size. Instead the coroutines simply buffer data on the filesystem to create large files.

Please see full explanation of S3 uniqueness here: https://docs.fluentbit.io/manual/pipeline/outputs/s3#differences-between-s3-and-other-fluent-bit-outputs

Therefore, S3 uses a “timer callback” which is invoked periodically to check for ready or timed out files and upload them to S3. The timer callback is invoked at a set frequency by the Fluent Bit engine. However, since timers do not have coroutines currently, the timer callback runs via the engine directly and does not run in a coroutine. This means it must use the sync networking stack.

Given the past issues with sync networking in outputs, and the AWS Distro team would like to migrate the S3 output to async networking.

Problem Statement and Goal

To migrate to the async networking stack for S3 output file uploads, the timer callback must run a coroutine.

Therefore, this document outlines a proposal to implement a scheduled timer that runs its callback in a coroutine.

Design

Creating output timer coroutines

[recommend] Option 1: re-use existing flb_sched_timer_cb_create

The existing flb_sched_timer_cb_create code can be re-used without modification: https://github.com/fluent/fluent-bit/blob/1.9/src/flb_scheduler.c#L435

This function tells the event loop to invoke a callback at a set frequency.

Currently, the function accepts a callback function pointer and a context pointer.

We will create a new function in flb_output.h that creates and invokes coroutines for timers.

void flb_output_coro_timer_cb(struct flb_config *config, void *data)

This function will be passed as the callback function pointer in flb_sched_timer_cb_create.

The data pointer passed to this callback must allow it to invoke a callback function in the output plugin and pass it some data. The flb_output_coro_timer_cb_create also needs to flb_output_instance to associate the coroutine with an output instance. Thus, we will create a new structure:

struct flb_output_coro_timer_data {
   struct flb_output_instance ins; /* associate coro with this output instance */
   void (*cb) (struct flb_config *config, void *data); /* call this output callback in the coro */
   void *data; /* opaque data to pass to the above cb */
}

[Not recommended] Option 2: New flb_sched_timer_output_coro_create

An alternate option would be creating a new timer create function in flb_scheduler.c.

int flb_sched_timer_output_coro_create(struct flb_sched *sched, int type, int ms,
                                       struct flb_output_instance ins
                                       void (*cb)(struct flb_config *, void *),
                                       void *data, struct flb_sched_timer **out_timer)

This function would need to call out to some function in flb_output.h that would create a coroutine and invoke a callback. This output function would be set as the timer event callback, and it would then call the passed in callback. That function would need to be similar to or the same as the option 1 function in flb_output.h. It would need a custom struct to hold the output instance, passed in callback and callback data.

This option is just as complicated as the above, simply makes things more complicated by adding additional code changes in flb_scheduler.c. Therefore, it is not recommended.

Yielding and resuming output timer coroutines

Once the coroutine is created, its execution is no different than other/existing coroutines. It can use the async network stack and yield and resume on network IO events the same as existing flush coroutines.

Tracking output timer coroutines

Currently, flush coroutines are tracked via a flush_list. This list exists either on the flb_output_instance in the zero worker case, or on the flb_thread_instance for worker mode.

There are two reasons why coroutines must be tracked:

  • Clean Up: Coroutines can not clean up and free themselves. Instead they can put themselves on a destroy list and yield to the engine which can subsequently free them.
  • Graceful Shutdown: on graceful shutdown, if a coroutine is still performing work, we want it to complete before the engine shuts down Fluent Bit. Tracking coroutines that are active makes this easy.

For flush coroutines, currently the following lists exist. The same design will be followed for output scheduled timer coroutines:

  • No worker case: There is a flush_list and flush_list_destroy on each flb_output_instance. For output timer coroutines, we can call it timer_coro_list and timer_coro_list_destroy.
  • Worker case: Same as above, except the lists are stored on the flb_thread_instance.

Currently, the structure stored on the list is a flb_output_flush, which can hold necessary metadata about the coroutine. We can create a similar structure:

struct flb_output_timer_coro {
    struct flb_config *config;         /* FLB context        */
    struct flb_output_instance *o_ins; /* output instance    */
    struct flb_coro *coro;             /* coro addr   */
    struct mk_list _head;              /* list */
};

For discussion: Unlike the flb_output_flush this structure lacks an ID. I was not able to discern the purpose of the ID in the current code for flush coroutines. Do we need an ID?

Cleaning up output timer coroutines

When a coroutine is complete, it needs to be added to the destroy list and then yield back to the engine.

For flush coroutines, the output is required to call the FLB_OUTPUT_RETURN macro which adds the coro to destroy list and yields.

For output timer coroutines, we could follow a similar model, and have a FLB_OUTPUT_TIMER_RETURN. The callback passed by the output plugin would be required to call this when complete. This macro will add the timer coroutine to the destroy list.

For discussion: Is there a simpler way to do this without a new macro?

Currently in the event loop (both engine and worker) there are clean up functions that run at the end of each iteration. Similar for output timer coroutines we would need to add a new clean up function that checks the timer_coro_list_destroy for coros that are ready to be destroyed.

Output timer coroutines and graceful shutdown

Tracking active coroutines is necessary for graceful shutdown. This ensures that pending uploads can complete when the engine receives a SIGTERM. Currently, active coroutines interact with graceful shutdown in the following places:

  1. Worker: [flb_output_thread.c](https://github.com/fluent/fluent-bit/blob/master/src/flb_output_thread.c#L336): Each worker thread waits for its active flush coroutines to complete by checking the flush_list length on shutdown.
  2. Engine: flb_engine.c: The engine calls flb_task_running_count to determine if any tasks are pending. This is essentially a proxy for active and pending flushes (which use flush coroutines). It also calls flb_task_running_print to show the user that the graceful shutdown is working on finishing pending tasks.

1. Worker change: checking output timer coroutines on shutdown

On shutdown, the worker must check the length of both the flush_list and the timer_coro_list.

2. Engine change: tracking and printing running tasks and output coroutines

The engine graceful shutdown should wait for scheduled timer coroutines to complete. In addition, the timer coroutines created by S3 count as in-progress work for an output, same as tasks. Therefore, we must make changes to this code to support the new coroutine type.

Therefore, in flb_engine.c we will create new functions that wrap the existing and new use cases:

  • flb_running_count: count of running tasks + output timer coroutines
  • flb_running_print: print running tasks + output timer coroutines

For the print function, the format of the output will be:

[output] {flb_output_name(out)} has %d pending custom job(s)

For example:

[output] s3.1 has 1 pending custom job(s)

Questions for comment:

  • is flb_engine.c the most ideal place for this new code?
  • is “custom job” the best/right term for this for the user? An alternate would be to allow the output to expose a timer coroutine “name” which would used. So S3 could set this to be upload so that the message says: [output] s3.1 has 1 pending upload(s)

Other changes

Currently there is a function flb_output_coros_size. This function seems to only be used for one purpose, for the FLB_OUTPUT_NO_MULTIPLEX function to ensure that only one flush runs at a time: https://github.com/fluent/fluent-bit/blob/v2.1.3/src/flb_engine_dispatch.c#L193

Consequently, it would appear that we do not need to add timer coroutine count to this function.

For Discussion: Is this the right long term decision?

@PettitWesley
Copy link
Contributor Author

@edsiper here is the issue and design as you requested/based on our discussion. Let me know what you think.

@leonardo-albertovich FYI... previously we had a design to migrate S3 to async networking that didn't require as many changes, it involved capturing one flush coroutine and turning it into a long lived "daemon coroutine", however, I implemented that and ran into issues.

You can see my work on that here:

Specifically, the way the daemon coroutine worked is it used flb_time_sleep in a loop to put itself to sleep at an interval and then wake back up and upload data. The main problem is that the daemon coroutine blocked graceful shutdown. If it was sleeping when graceful shutdown started, then the shutdown would be blocked until it was resumed and then exited. I was thinking of simply accepting this and setting the sleep to be a small number of seconds, but Eduardo suggested that instead we pursue supporting coroutines for timers.

@leonardo-albertovich
Copy link
Collaborator

We have been talking about a similar functionality internally but didn't get to this stage yet so it's great that you did.

At first glance the first option seems preferable but I think we should ensure that the design is generic with homogeneous "entry points" for all four types of plugin (input, output, filter and processor) so we don't end up replicating more code or having to modify the interface soon.

The particular use cases for this we have are filters that perform aggregation or grouping such as multiline, extensibility filters such as lua or wasm and in the future probably processors that do aggregation.

As for that first attempt you made, I agree that while nifty it's not ideal and with a rather voluminous plugin such as s3 simplicity should be favored.

@PettitWesley
Copy link
Contributor Author

At first glance the first option seems preferable but I think we should ensure that the design is generic with homogeneous "entry points" for all four types of plugin (input, output, filter and processor) so we don't end up replicating more code or having to modify the interface soon.

@leonardo-albertovich So how this would work is that each plugin time would need a callback function and data struct for its type. Each would follow the pattern that I have here for outputs.

So you generally agree with the design?

Do you have any comment on the other "For discussion" bits? (They are bolded or you can search for them).

@leonardo-albertovich
Copy link
Collaborator

No, sorry if that was confusing, maybe I didn't express it clearly but I meant to say that having a timer callback as part of the standard plugin callbacks would end up being limiting.

I agree that it would be nice if plugins could schedule coroutine based timers.

As for the question about the id, I'd have to go through the code to get that context back to be able to answer, I'll take a look at it tomorrow.

@edsiper
Copy link
Member

edsiper commented May 24, 2023

@edsiper here is the issue and design as you requested/based on our discussion. Let me know what you think.

thank you, I am good with the approach proposed, we need async timers with co-routines involved.

btw, thanks for taking the time to document the problem, use cases and approaches.

@PettitWesley
Copy link
Contributor Author

@leonardo-albertovich there's one more important key bit to look into:

Currently there is a function flb_output_coros_size. This function seems to only be used for one purpose, for the FLB_OUTPUT_NO_MULTIPLEX function to ensure that only one flush runs at a time: https://github.com/fluent/fluent-bit/blob/v2.1.3/src/flb_engine_dispatch.c#L193

I think I don't need to update this... or if I do it should be to change this code to be flb_output_flush_coros_size which seems to be its intended purpose.

Copy link
Contributor

This issue is stale because it has been open 90 days with no activity. Remove stale label or comment or this will be closed in 5 days. Maintainers can add the exempt-stale label.

@github-actions github-actions bot added the Stale label Dec 10, 2023
Copy link
Contributor

This issue was closed because it has been stalled for 5 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Dec 17, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants