Skip to content

Commit

Permalink
chore(kafka): batch processor config (#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
kjubybot authored Feb 9, 2025
1 parent 95101a0 commit ceb2a9e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 12 deletions.
27 changes: 15 additions & 12 deletions pkg/output/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ import (
)

type Config struct {
Brokers string `yaml:"brokers"`
Topic string `yaml:"topic"`
TLS bool `yaml:"tls" default:"false"`
TLSClientConfig TLSClientConfig `yaml:"tlsClientConfig"`
MaxQueueSize int `yaml:"maxQueueSize" default:"51200"`
FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"`
FlushMessages int `yaml:"flushMessages" default:"500"`
FlushBytes int `yaml:"flushBytes" default:"1000000"`
MaxRetries int `yaml:"maxRetries" default:"3"`
Compression CompressionStrategy `yaml:"compression" default:"none"`
RequiredAcks RequiredAcks `yaml:"requiredAcks" default:"leader"`
Partitioning PartitionStrategy `yaml:"partitioning" default:"none"`
Brokers string `yaml:"brokers"`
Topic string `yaml:"topic"`
TLS bool `yaml:"tls" default:"false"`
TLSClientConfig TLSClientConfig `yaml:"tlsClientConfig"`
MaxQueueSize int `yaml:"maxQueueSize" default:"51200"`
BatchTimeout time.Duration `yaml:"batchTimeout" default:"5s"`
MaxExportBatchSize int `yaml:"maxExportBatchSize" default:"512"`
Workers int `yaml:"workers" default:"5"`
FlushFrequency time.Duration `yaml:"flushFrequency" default:"10s"`
FlushMessages int `yaml:"flushMessages" default:"500"`
FlushBytes int `yaml:"flushBytes" default:"1000000"`
MaxRetries int `yaml:"maxRetries" default:"3"`
Compression CompressionStrategy `yaml:"compression" default:"none"`
RequiredAcks RequiredAcks `yaml:"requiredAcks" default:"leader"`
Partitioning PartitionStrategy `yaml:"partitioning" default:"none"`
}

type TLSClientConfig struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/output/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ func New(name string, config *Config, log logrus.FieldLogger, filterConfig *xatu
exporter,
xatu.ImplementationLower()+"_output_"+SinkType+"_"+name,
log,
processor.WithMaxQueueSize(config.MaxQueueSize),
processor.WithBatchTimeout(config.BatchTimeout),
processor.WithMaxExportBatchSize(config.MaxExportBatchSize),
processor.WithShippingMethod(shippingMethod),
processor.WithWorkers(config.Workers),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit ceb2a9e

Please sign in to comment.