Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_azure_kusto: added buffering fixes #9797

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

tanmaya-panda1
Copy link
Contributor

@tanmaya-panda1 tanmaya-panda1 commented Jan 5, 2025

This PR brings buffering support before the data is ingested to Azure Kusto. This helps in improving the COGS cost by optimizing batch size of blob in kusto queued ingestion.

This PR also adds

  1. IMDS authentication mode support
  2. configurable generated blob uri during queued ingestion to eliminate blob id collisions
  3. configurable io timeout during http calls

along with the host of new features of along with the introduction of buffering functionality.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change

[OUTPUT]
name azure_kusto
match *
tenant_id xxxxxxxxxxxxxxxxxxxxxxxxx
client_id xxxxxxxxxxxxxxxxxxxxxxxxx
client_secret xxxxxxxxxxxxxxxxxxxxxxxxx
ingestion_endpoint https://ingest-xxxxxx-kusto.windows.net
database_name e2e
table_name SampleDB
ingestion_endpoint_connect_timeout 600
Retry_Limit 5
buffering_enabled On
compression_enabled On
upload_timeout 2m
upload_file_size 125M
azure_kusto_buffer_key kusto1
buffer_file_delete_early Off
unify_tag On
use_imds Off
buffer_dir /var/log

  • Debug log output from testing the change

2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] inside azure kusto init
[2025/01/13 11:06:22] [ info] [output:azure_kusto:azure_kusto.0] endpoint='https://ingest-xxxx.xx.kusto.windows.net/', database='xx', table='FluentBitTemp'
[2025/01/13 11:06:22] [ info] [fstore] created root path /tmp/fluent-bit/azure-kusto/key
[2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] Processing stream: '2025-01-13T11:06:22'
[2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] Stream '2025-01-13T11:06:22' has 0 files
[2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] No data found in any stream
[2025/01/13 11:06:22] [ info] [output:azure_kusto:azure_kusto.0] Using upload size 125000000 bytes
[2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] Processing stream: '2025-01-13T11:06:22'
[2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] Stream '2025-01-13T11:06:22' has 0 files
[2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] No data found in any stream
[2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] async flag is 0
[2025/01/13 11:06:22] [debug] [output:azure_kusto:azure_kusto.0] azure kusto init completed


[2025/01/13 11:09:17] [debug] [output:azure_kusto:azure_kusto.0] flushing bytes for event tag dummy.log and size 8448
[2025/01/13 11:09:17] [debug] [output:azure_kusto:azure_kusto.0] inside flush_init with old_buffers as 0
[2025/01/13 11:09:17] [debug] [output:azure_kusto:azure_kusto.0] Did not find any local buffered data from previous executions to kusto; buffer=/tmp/fluent-bit/azure-kusto/key
[2025/01/13 11:09:17] [debug] [output:azure_kusto:azure_kusto.0] Found matching file '5574873143815523159-5866062048993987929' for tag 'fluentbit-buffer-file-unify-tag.log'
[2025/01/13 11:09:17] [trace] [output:azure_kusto:azure_kusto.0 at /home/fluentbitvm/fluent-bit/plugins/out_azure_kusto/azure_kusto.c:1259] Buffering chunk 21648
[2025/01/13 11:09:17] [debug] [output:azure_kusto:azure_kusto.0] [azure_kusto] new file size: 617460
[2025/01/13 11:09:17] [debug] [output:azure_kusto:azure_kusto.0] [azure_kusto] current_buffer_size: 617460
[2025/01/13 11:09:17] [debug] [output:azure_kusto:azure_kusto.0] buffered chunk dummy.log
[2025/01/13 11:09:17] [debug] [out flush] cb_destroy coro_id=26
[2025/01/13 11:09:17] [debug] [task] destroy task=0x736b470 (task_id=0)
[2025/01/13 11:09:18] [ info] [engine] service has stopped (0 pending tasks)
[2025/01/13 11:09:18] [ info] [input] pausing dummy.0
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] Processing stream: '2025-01-13T11:06:22'
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] Stream '2025-01-13T11:06:22' has 1 files
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] File in stream '2025-01-13T11:06:22': '5574873143815523159-5866062048993987929'
[2025/01/13 11:09:18] [ info] [output:azure_kusto:azure_kusto.0] Sending all locally buffered data to Kusto
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] [construct_request_buffer] size of buffer file read 617460
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] [construct_request_buffer] final increased 617460
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] ingest_all_old_buffer_files :: enabled payload gzip compression
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] generated random integer 2969820
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] current time 1736766558906
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] load_time is 1736766532296
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] difference is 26610
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] effective ingestion resource interval is 6569820
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] resources are already loaded and are not stale
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] inside blob after upstream ha node get
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] azure_kusto_create_blob -- async flag is 0
[2025/01/13 11:09:18] [debug] [output:azure_kusto:azure_kusto.0] inside blob after upstream ha node get :: setting ingestion timeout
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] inside blob after upstream ha node get :: after getting connection
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] inside blob before create blob uri
[2025/01/13 11:0

[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] created blob uri xxxxxxx
[2025/01/13 11:09:19] [ info] [output:azure_kusto:azure_kusto.0] azure_kusto: before calling azure storage api :: value of set io_timeout is 60
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] uploading payload to blob uri: xxxxxxx
[2025/01/13 11:09:19] [debug] [http_client] not using http_proxy for header
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] kusto blob upload request http_do=0, HTTP Status: 201
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] created queue uri xxxxxx
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] uuid :: cc8c25c0-dfee-a626-f155-9136717b4a2c
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] blob uri :: xxxxxx
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] payload size :: 14805
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] database_name :: xxx
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] table name :: FluentBitTemp
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] created ingestion message:
{"Id": "cc8c25c0-dfee-a626-f155-9136717b4a2c", "BlobPath": "xxxxxx", "RawDataSize": 14805, "DatabaseName": "xxxx", "TableName": "FluentBitTemp", "Cl
ientVersionForTracing": "Kusto.Fluent-Bit:3.2.3", "ApplicationForTracing": "Kusto.Fluent-Bit", "AdditionalProperties": { "format": "multijson", "authorizationContext": "token", "jsonMappingReference": "" }}
[2025/01/13 11:09:19] [debug] [http_client] not using http_proxy for header
[2025/01/13 11:09:19] [debug] [output:azure_kusto:azure_kusto.0] kusto queue request http_do=0, HTTP Status: 201
[2025/01/13 11:09:19] [ info] [output:azure_kusto:azure_kusto.0] before exiting the plugin kusto conf destroy called

  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

(fluent/fluent-bit-docs#1546)

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant