Skip to content

Commit

Permalink
workaround kafka retry deadlock elastic#1432 (elastic#1543)
Browse files Browse the repository at this point in the history
* Laziliy init kafka outputs

Initialize output modes with/without guaranteed retry policy.

* Add tryPushFailed to lb/context
  • Loading branch information
Steffen Siering authored and tsg committed May 2, 2016
1 parent 4a8f00d commit cfedc9d
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 47 deletions.
14 changes: 6 additions & 8 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type client struct {

type msgRef struct {
count int32
err atomic.Value
batch []common.MapStr
cb func([]common.MapStr, error)

err error
}

var (
Expand Down Expand Up @@ -161,9 +162,7 @@ func (r *msgRef) done() {
}

func (r *msgRef) fail(err error) {
debugf("Kafka publish failed with: %v", err)

r.err.Store(err)
r.err = err
r.dec()
}

Expand All @@ -175,11 +174,10 @@ func (r *msgRef) dec() {

debugf("finished kafka batch")

var err error
v := r.err.Load()
if v != nil {
err = v.(error)
err := r.err
if err != nil {
eventsNotAcked.Add(int64(len(r.batch)))
debugf("Kafka publish failed with: %v", err)
r.cb(r.batch, err)
} else {
ackedEvents.Add(int64(len(r.batch)))
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type kafkaConfig struct {
RequiredACKs *int `config:"required_acks" validate:"min=-1"`
BrokerTimeout time.Duration `config:"broker_timeout" validate:"min=1"`
Compression string `config:"compression"`
MaxRetries int `config:"max_retries"`
MaxRetries int `config:"max_retries" validate:"min=-1,nonzero"`
ClientID string `config:"client_id"`
ChanBufferSize int `config:"channel_buffer_size" validate:"min=1"`
}
Expand Down
114 changes: 84 additions & 30 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,20 @@ import (
)

type kafka struct {
mode mode.ConnectionMode
config kafkaConfig

modeRetry mode.ConnectionMode
modeGuaranteed mode.ConnectionMode
}

const (
defaultWaitRetry = 1 * time.Second

// NOTE: maxWaitRetry has no effect on mode, as logstash client currently does
// not return ErrTempBulkFailure
defaultMaxWaitRetry = 60 * time.Second
)

func init() {
sarama.Logger = kafkaLogger{}
outputs.RegisterOutputPlugin("kafka", New)
Expand Down Expand Up @@ -55,80 +66,119 @@ func New(cfg *common.Config, topologyExpire int) (outputs.Outputer, error) {
func (k *kafka) init(cfg *common.Config) error {
debugf("initialize kafka output")

config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
k.config = defaultConfig
if err := cfg.Unpack(&k.config); err != nil {
return err
}

libCfg, err := newKafkaConfig(&config)
_, err := newKafkaConfig(&k.config)
if err != nil {
return err
}

hosts := config.Hosts
if len(hosts) < 1 {
logp.Err("Kafka configuration failed with: %v", errNoHosts)
return errNoHosts
}
debugf("hosts: %v", hosts)
return nil
}

useType := config.UseType
func (k *kafka) initMode(guaranteed bool) (mode.ConnectionMode, error) {
libCfg, err := newKafkaConfig(&k.config)
if err != nil {
return nil, err
}

topic := config.Topic
if topic == "" && !useType {
logp.Err("Kafka configuration failed with: %v", errNoTopicSet)
return errNoTopicSet
if guaranteed {
libCfg.Producer.Retry.Max = 1000
}

var clients []mode.AsyncProtocolClient
worker := 1
if config.Worker > 1 {
worker = config.Worker
if k.config.Worker > 1 {
worker = k.config.Worker
}

var clients []mode.AsyncProtocolClient
hosts := k.config.Hosts
topic := k.config.Topic
useType := k.config.UseType
for i := 0; i < worker; i++ {
client, err := newKafkaClient(hosts, topic, useType, libCfg)
if err != nil {
logp.Err("Failed to create kafka client: %v", err)
return err
return nil, err
}

clients = append(clients, client)
}

maxAttempts := 1
if guaranteed {
maxAttempts = 0
}

mode, err := modeutil.NewAsyncConnectionMode(
clients,
false,
config.MaxRetries,
libCfg.Producer.Retry.Backoff,
maxAttempts,
defaultWaitRetry,
libCfg.Net.WriteTimeout,
10*time.Second)
defaultMaxWaitRetry)
if err != nil {
logp.Err("Failed to configure kafka connection: %v", err)
return err
return nil, err
}
return mode, nil
}

k.mode = mode
return nil
func (k *kafka) getMode(opts outputs.Options) (mode.ConnectionMode, error) {
var err error
guaranteed := opts.Guaranteed || k.config.MaxRetries == -1
if guaranteed {
if k.modeGuaranteed == nil {
k.modeGuaranteed, err = k.initMode(true)
}
return k.modeGuaranteed, err
}

if k.modeRetry == nil {
k.modeRetry, err = k.initMode(false)
}
return k.modeRetry, err
}

func (k *kafka) Close() error {
return k.mode.Close()
var err error

if k.modeGuaranteed != nil {
err = k.modeGuaranteed.Close()
}
if k.modeRetry != nil {
tmp := k.modeRetry.Close()
if err == nil {
err = tmp
}
}
return err
}

func (k *kafka) PublishEvent(
signal op.Signaler,
opts outputs.Options,
event common.MapStr,
) error {
return k.mode.PublishEvent(signal, opts, event)
mode, err := k.getMode(opts)
if err != nil {
return err
}
return mode.PublishEvent(signal, opts, event)
}

func (k *kafka) BulkPublish(
signal op.Signaler,
opts outputs.Options,
event []common.MapStr,
) error {
return k.mode.PublishEvents(signal, opts, event)
mode, err := k.getMode(opts)
if err != nil {
return err
}
return mode.PublishEvents(signal, opts, event)
}

func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) {
Expand Down Expand Up @@ -170,7 +220,11 @@ func newKafkaConfig(config *kafkaConfig) (*sarama.Config, error) {
k.Producer.Return.Errors = true

// have retries being handled by libbeat, disable retries in sarama library
k.Producer.Retry.Max = 0
retryMax := config.MaxRetries
if retryMax < 0 {
retryMax = 1000
}
k.Producer.Retry.Max = retryMax

// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize
Expand Down
12 changes: 8 additions & 4 deletions libbeat/outputs/mode/lb/async_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,12 @@ func (w *asyncWorker) handleResults(msg eventsMessage) func([]common.MapStr, err
}

func (w *asyncWorker) onFail(msg eventsMessage, err error) {
go func() {
logp.Info("Error publishing events (retrying): %s", err)
w.ctx.pushFailed(msg)
}()
if !w.ctx.tryPushFailed(msg) {
// break possible deadlock by spawning go-routine returning failed messages
// into retries queue
go func() {
logp.Info("Error publishing events (retrying): %s", err)
w.ctx.pushFailed(msg)
}()
}
}
14 changes: 14 additions & 0 deletions libbeat/outputs/mode/lb/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ func (ctx *context) pushFailed(msg eventsMessage) bool {
return ok
}

func (ctx *context) tryPushFailed(msg eventsMessage) bool {
if msg.attemptsLeft == 0 {
dropping(msg)
return true
}

select {
case ctx.retries <- msg:
return true
default:
return false
}
}

func (ctx *context) forwardEvent(ch chan eventsMessage, msg eventsMessage) bool {
debugf("forwards msg with attempts=%v", msg.attemptsLeft)

Expand Down
10 changes: 6 additions & 4 deletions libbeat/outputs/mode/modetest/modetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,12 @@ func TestMode(

if collectedEvents != nil {
assert.Equal(t, len(expectedEvents), len(*collectedEvents))
for i := range *collectedEvents {
expected := expectedEvents[i]
actual := (*collectedEvents)[i]
assert.Equal(t, expected, actual)
if len(expectedEvents) == len(*collectedEvents) {
for i := range *collectedEvents {
expected := expectedEvents[i]
actual := (*collectedEvents)[i]
assert.Equal(t, expected, actual)
}
}
}
}

0 comments on commit cfedc9d

Please sign in to comment.