Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TT-9360] Changing Timeout from time.Duration to int #696

Merged
merged 8 commits into from
Aug 1, 2023
33 changes: 28 additions & 5 deletions pumps/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"crypto/tls"
"encoding/json"
"os"
"strconv"
"time"

"github.com/TykTechnologies/tyk-pump/analytics"
Expand Down Expand Up @@ -39,8 +41,8 @@ type KafkaConf struct {
ClientId string `json:"client_id" mapstructure:"client_id"`
// The topic that the writer will produce messages to.
Topic string `json:"topic" mapstructure:"topic"`
// Timeout is the maximum amount of time will wait for a connect or write to complete.
Timeout time.Duration `json:"timeout" mapstructure:"timeout"`
// Timeout is the maximum amount of seconds to wait for a connect or write to complete.
Timeout interface{} `json:"timeout" mapstructure:"timeout"`
// Enable "github.com/golang/snappy" codec to be used to compress Kafka messages. By default
// is `false`.
Compressed bool `json:"compressed" mapstructure:"compressed"`
Expand Down Expand Up @@ -90,6 +92,10 @@ func (k *KafkaPump) Init(config interface{}) error {
}

processPumpEnvVars(k, k.log, k.kafkaConf, kafkaDefaultENV)
// This interface field is not reached by envconfig library, that's why we manually check it
if os.Getenv("TYK_PMP_PUMPS_KAFKA_META_TIMEOUT") != "" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too fond of this way of getting the env var, but envconfig library isn't getting the env var if the data type is an interface

k.kafkaConf.Timeout = os.Getenv("TYK_PMP_PUMPS_KAFKA_META_TIMEOUT")
}

var tlsConfig *tls.Config
if k.kafkaConf.UseSSL {
Expand Down Expand Up @@ -137,9 +143,26 @@ func (k *KafkaPump) Init(config interface{}) error {
k.log.WithField("SASL-Mechanism", k.kafkaConf.SASLMechanism).Warn("Tyk pump doesn't support this SASL mechanism.")
}

// Timeout is an interface type to allow both time.Duration and float values
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a default value here? there wasn't a default one before this change

var timeout time.Duration
switch v := k.kafkaConf.Timeout.(type) {
case string:
timeout, err = time.ParseDuration(v) // i.e: when timeout is '1s'
if err != nil {
floatValue, floatErr := strconv.ParseFloat(v, 64) // i.e: when timeout is '1'
if floatErr != nil {
k.log.Fatal("Failed to parse timeout: ", floatErr)
} else {
timeout = time.Duration(floatValue * float64(time.Second))
}
}
case float64:
timeout = time.Duration(v) * time.Second // i.e: when timeout is 1
}

//Kafka writer connection config
dialer := &kafka.Dialer{
Timeout: k.kafkaConf.Timeout * time.Second,
Timeout: timeout,
ClientID: k.kafkaConf.ClientId,
TLS: tlsConfig,
SASLMechanism: mechanism,
Expand All @@ -149,8 +172,8 @@ func (k *KafkaPump) Init(config interface{}) error {
k.writerConfig.Topic = k.kafkaConf.Topic
k.writerConfig.Balancer = &kafka.LeastBytes{}
k.writerConfig.Dialer = dialer
k.writerConfig.WriteTimeout = k.kafkaConf.Timeout * time.Second
k.writerConfig.ReadTimeout = k.kafkaConf.Timeout * time.Second
k.writerConfig.WriteTimeout = timeout
k.writerConfig.ReadTimeout = timeout
if k.kafkaConf.Compressed {
k.writerConfig.CompressionCodec = snappy.NewCompressionCodec()
}
Expand Down
Loading