From db8682a87e59f5d29ce0a1af4721fcf9470906d3 Mon Sep 17 00:00:00 2001 From: CoreidCC Date: Thu, 9 Jan 2025 08:57:43 +0100 Subject: [PATCH] in_kafka: formatting adjustments and typos Signed-off-by: CoreidCC --- plugins/in_kafka/in_kafka.c | 31 +++++++++++++++---------------- plugins/in_kafka/in_kafka.h | 2 +- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index 395217cc378..db3642fd148 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -161,7 +161,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, ret = FLB_EVENT_ENCODER_SUCCESS; while (ret == FLB_EVENT_ENCODER_SUCCESS) { - rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeount_ms); + rkm = rd_kafka_consumer_poll(ctx->kafka.rk, ctx->poll_timeout_ms); if (!rkm) { break; @@ -181,7 +181,7 @@ static int in_kafka_collect(struct flb_input_instance *ins, rd_kafka_message_destroy(rkm); - if(!ctx->enable_auto_commit) { + if (!ctx->enable_auto_commit) { if (ret == FLB_EVENT_ENCODER_SUCCESS) { rd_kafka_commit(ctx->kafka.rk, NULL, 0); } @@ -248,25 +248,24 @@ static int in_kafka_init(struct flb_input_instance *ins, goto init_error; } - /* Set the kafka poll timeout dependend on wether we run in our own - * or in the main event thread. - * a) run in main event thread: - * -> minimize the delay we might create - * b) run in our own thread: - * -> optimize for throuput and relay on 'fetch.wait.max.ms' - * which is set to 500 by default default. wa algin our - * timeout with what is set for 'fetch.wait.max.ms' - */ - ctx->poll_timeount_ms = 1; + /* Set the kafka poll timeout depending on whether we run in our own + or in the main event thread. + a) run in main event thread: + -> minimize the delay we might create + b) run in our own thread: + -> optimize for throughput and relay on 'fetch.wait.max.ms' + which is set to 500 by default. we align our + timeout with what is set for 'fetch.wait.max.ms' */ + ctx->poll_timeout_ms = 1; if (ins->is_threaded) { - ctx->poll_timeount_ms = 550; // ensure kafa triggers timeout + ctx->poll_timeout_ms = 550; /* ensure kafa triggers timeout */ - // align our timeout with what was configured for fetch.wait.max.ms + /* align our timeout with what was configured for fetch.wait.max.ms */ dsize = sizeof(conf_val); res = rd_kafka_conf_get(kafka_conf, "fetch.wait.max.ms", conf_val, &dsize); if (res == RD_KAFKA_CONF_OK && dsize <= sizeof(conf_val)) { - // add 50ms so kafa triggers timout - ctx->poll_timeount_ms = atoi(conf_val) + 50; + /* add 50ms so kafa triggers timeout */ + ctx->poll_timeout_ms = atoi(conf_val) + 50; } } diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 7a57e0ca68b..ccb48f48584 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -51,7 +51,7 @@ struct flb_in_kafka_config { size_t buffer_max_size; /* Maximum size of chunk allocation */ size_t polling_threshold; bool enable_auto_commit; - int poll_timeount_ms; + int poll_timeout_ms; }; #endif