Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration parameters for rdkafka config #524

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 83 additions & 4 deletions lib/fluent/plugin/in_rdkafka_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input

helpers :thread, :parser, :compat_parameters

config_param :brokers, :string, :default => 'localhost:9092',
:desc => <<-DESC
Set brokers directly:
<broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
Brokers: you can choose to use either brokers or zookeeper.
DESC

config_param :group_id, :string, :default => 'fluentd',
:desc => "A group id for the consumer."

config_param :topics, :string,
:desc => "Listening topics(separate with comma',')."

Expand Down Expand Up @@ -47,8 +57,27 @@ class Fluent::Plugin::RdKafkaGroupInput < Fluent::Plugin::Input
config_param :max_batch_size, :integer, :default => 10000,
:desc => "Maximum number of log lines emitted in a single batch."

config_param :kafka_configs, :hash, :default => {},
:desc => "Kafka configuration properties as desribed in https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"
config_param :idempotent, :bool, :default => false, :desc => 'Enable idempotent producer'

config_param :max_send_retries, :integer, :default => 2,
:desc => "Number of times to retry sending of messages to a leader. Used for message.send.max.retries"
config_param :required_acks, :integer, :default => -1,
:desc => "The number of acks required per request. Used for request.required.acks"
config_param :ack_timeout, :time, :default => nil,
:desc => "How long the producer waits for acks. Used for request.timeout.ms"
config_param :compression_codec, :string, :default => nil,
:desc => <<-DESC
The codec the producer uses to compress messages. Used for compression.codec
Supported codecs: (gzip|snappy)
DESC

config_param :rdkafka_buffering_max_ms, :integer, :default => nil, :desc => 'Used for queue.buffering.max.ms'
config_param :rdkafka_buffering_max_messages, :integer, :default => nil, :desc => 'Used for queue.buffering.max.messages'
config_param :rdkafka_message_max_bytes, :integer, :default => nil, :desc => 'Used for message.max.bytes'
config_param :rdkafka_message_max_num, :integer, :default => nil, :desc => 'Used for batch.num.messages'

config_param :rdkafka_options, :hash, :default => {},
:desc => "Set any rdkafka configuration. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md"

config_section :parse do
config_set_default :@type, 'json'
Expand Down Expand Up @@ -91,7 +120,7 @@ def configure(conf)
log.warn "The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!"

log.info "Will watch for topics #{@topics} at brokers " \
"#{@kafka_configs["bootstrap.servers"]} and '#{@kafka_configs["group.id"]}' group"
"#{@brokers} and '#{@group_id}' group"

@topics = _config_to_array(@topics)

Expand Down Expand Up @@ -141,6 +170,55 @@ def setup_parser(parser_conf)
end
end

def build_config
config = {:"bootstrap.servers" => @brokers}

if @ssl_ca_cert && @ssl_ca_cert[0]
ssl = true
config[:"ssl.ca.location"] = @ssl_ca_cert[0]
config[:"ssl.certificate.location"] = @ssl_client_cert if @ssl_client_cert
config[:"ssl.key.location"] = @ssl_client_cert_key if @ssl_client_cert_key
config[:"ssl.key.password"] = @ssl_client_cert_key_password if @ssl_client_cert_key_password
end

if @principal
sasl = true
config[:"sasl.mechanisms"] = "GSSAPI"
config[:"sasl.kerberos.principal"] = @principal
config[:"sasl.kerberos.service.name"] = @service_name if @service_name
config[:"sasl.kerberos.keytab"] = @keytab if @keytab
end

if ssl && sasl
security_protocol = "SASL_SSL"
elsif ssl && !sasl
security_protocol = "SSL"
elsif !ssl && sasl
security_protocol = "SASL_PLAINTEXT"
else
security_protocol = "PLAINTEXT"
end
config[:"security.protocol"] = security_protocol

config[:"compression.codec"] = @compression_codec if @compression_codec
config[:"message.send.max.retries"] = @max_send_retries if @max_send_retries
config[:"request.required.acks"] = @required_acks if @required_acks
config[:"request.timeout.ms"] = @ack_timeout * 1000 if @ack_timeout
config[:"queue.buffering.max.ms"] = @rdkafka_buffering_max_ms if @rdkafka_buffering_max_ms
config[:"queue.buffering.max.messages"] = @rdkafka_buffering_max_messages if @rdkafka_buffering_max_messages
config[:"message.max.bytes"] = @rdkafka_message_max_bytes if @rdkafka_message_max_bytes
config[:"batch.num.messages"] = @rdkafka_message_max_num if @rdkafka_message_max_num
config[:"sasl.username"] = @username if @username
config[:"sasl.password"] = @password if @password
config[:"enable.idempotence"] = @idempotent if @idempotent

@rdkafka_options.each { |k, v|
config[k.to_sym] = v
}

config
end

def start
super

Expand All @@ -161,7 +239,8 @@ def shutdown
end

def setup_consumer
consumer = Rdkafka::Config.new(@kafka_configs).consumer
@config = build_config
consumer = Rdkafka::Config.new(@config).consumer
consumer.subscribe(*@topics)
consumer
end
Expand Down