From a692a76d008231732d08ab09ddbd8649f2ca5f7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Nov=C3=A1k?= Date: Mon, 6 Jan 2025 16:16:32 +0100 Subject: [PATCH 1/2] Add configuration parameters for rdkafka config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tomáš Novák --- lib/fluent/plugin/in_rdkafka_group.rb | 87 +++++++++++++++++++++++++-- 1 file changed, 83 insertions(+), 4 deletions(-) diff --git a/lib/fluent/plugin/in_rdkafka_group.rb b/lib/fluent/plugin/in_rdkafka_group.rb index 5141a91..7c906c8 100644 --- a/lib/fluent/plugin/in_rdkafka_group.rb +++ b/lib/fluent/plugin/in_rdkafka_group.rb @@ -9,6 +9,16 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input helpers :thread, :parser, :compat_parameters + config_param :brokers, :string, :default => 'localhost:9092', + :desc => <<-DESC +Set brokers directly: +:,:,.. +Brokers: you can choose to use either brokers or zookeeper. +DESC + + config_param :group_id, :string, :default => 'fluentd', + :desc => "A group id for the consumer." + config_param :topics, :string, :desc => "Listening topics(separate with comma',')." @@ -47,8 +57,27 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input config_param :max_batch_size, :integer, :default => 10000, :desc => "Maximum number of log lines emitted in a single batch." - config_param :kafka_configs, :hash, :default => {}, - :desc => "Kafka configuration properties as desribed in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md" + config_param :idempotent, :bool, :default => false, :desc => 'Enable idempotent producer' + + config_param :max_send_retries, :integer, :default => 2, + :desc => "Number of times to retry sending of messages to a leader. Used for message.send.max.retries" + config_param :required_acks, :integer, :default => -1, + :desc => "The number of acks required per request. Used for request.required.acks" + config_param :ack_timeout, :time, :default => nil, + :desc => "How long the producer waits for acks. Used for request.timeout.ms" + config_param :compression_codec, :string, :default => nil, + :desc => <<-DESC +The codec the producer uses to compress messages. Used for compression.codec +Supported codecs: (gzip|snappy) +DESC + + config_param :rdkafka_buffering_max_ms, :integer, :default => nil, :desc => 'Used for queue.buffering.max.ms' + config_param :rdkafka_buffering_max_messages, :integer, :default => nil, :desc => 'Used for queue.buffering.max.messages' + config_param :rdkafka_message_max_bytes, :integer, :default => nil, :desc => 'Used for message.max.bytes' + config_param :rdkafka_message_max_num, :integer, :default => nil, :desc => 'Used for batch.num.messages' + + config_param :rdkafka_options, :hash, :default => {}, + :desc => "Set any rdkafka configuration. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md" config_section :parse do config_set_default :@type, 'json' @@ -91,7 +120,7 @@ def configure(conf) log.warn "The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!" log.info "Will watch for topics #{@topics} at brokers " \ - "#{@kafka_configs["bootstrap.servers"]} and '#{@kafka_configs["group.id"]}' group" + "#{@brokers} and '#{@group_id}' group" @topics = _config_to_array(@topics) @@ -141,6 +170,55 @@ def setup_parser(parser_conf) end end + def build_config + config = {:"bootstrap.servers" => @brokers} + + if @ssl_ca_cert && @ssl_ca_cert[0] + ssl = true + config[:"ssl.ca.location"] = @ssl_ca_cert[0] + config[:"ssl.certificate.location"] = @ssl_client_cert if @ssl_client_cert + config[:"ssl.key.location"] = @ssl_client_cert_key if @ssl_client_cert_key + config[:"ssl.key.password"] = @ssl_client_cert_key_password if @ssl_client_cert_key_password + end + + if @principal + sasl = true + config[:"sasl.mechanisms"] = "GSSAPI" + config[:"sasl.kerberos.principal"] = @principal + config[:"sasl.kerberos.service.name"] = @service_name if @service_name + config[:"sasl.kerberos.keytab"] = @keytab if @keytab + end + + if ssl && sasl + security_protocol = "SASL_SSL" + elsif ssl && !sasl + security_protocol = "SSL" + elsif !ssl && sasl + security_protocol = "SASL_PLAINTEXT" + else + security_protocol = "PLAINTEXT" + end + config[:"security.protocol"] = security_protocol + + config[:"compression.codec"] = @compression_codec if @compression_codec + config[:"message.send.max.retries"] = @max_send_retries if @max_send_retries + config[:"request.required.acks"] = @required_acks if @required_acks + config[:"request.timeout.ms"] = @ack_timeout * 1000 if @ack_timeout + config[:"queue.buffering.max.ms"] = @rdkafka_buffering_max_ms if @rdkafka_buffering_max_ms + config[:"queue.buffering.max.messages"] = @rdkafka_buffering_max_messages if @rdkafka_buffering_max_messages + config[:"message.max.bytes"] = @rdkafka_message_max_bytes if @rdkafka_message_max_bytes + config[:"batch.num.messages"] = @rdkafka_message_max_num if @rdkafka_message_max_num + config[:"sasl.username"] = @username if @username + config[:"sasl.password"] = @password if @password + config[:"enable.idempotence"] = @idempotent if @idempotent + + @rdkafka_options.each { |k, v| + config[k.to_sym] = v + } + + config + end + def start super @@ -161,7 +239,8 @@ def shutdown end def setup_consumer - consumer = Rdkafka::Config.new(@kafka_configs).consumer + @config = build_config + consumer = Rdkafka::Config.new(config).consumer consumer.subscribe(*@topics) consumer end From acb6b9a391e81d05f4f910115d5772f0c7278e8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Nov=C3=A1k?= Date: Mon, 6 Jan 2025 16:25:59 +0100 Subject: [PATCH 2/2] Fix consumer configuration setup in rdkafka plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Tomáš Novák --- lib/fluent/plugin/in_rdkafka_group.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/fluent/plugin/in_rdkafka_group.rb b/lib/fluent/plugin/in_rdkafka_group.rb index 7c906c8..6967c45 100644 --- a/lib/fluent/plugin/in_rdkafka_group.rb +++ b/lib/fluent/plugin/in_rdkafka_group.rb @@ -240,7 +240,7 @@ def shutdown def setup_consumer @config = build_config - consumer = Rdkafka::Config.new(config).consumer + consumer = Rdkafka::Config.new(@config).consumer consumer.subscribe(*@topics) consumer end