From f2b32ede1d09933a4d3159b966ba2d1e9fe7f9b7 Mon Sep 17 00:00:00 2001 From: Ravi Suhag Date: Thu, 31 Oct 2024 15:07:28 +0530 Subject: [PATCH] feat: added sink batch size config for sink concurrency --- agent/agent.go | 6 +++--- agent/agent_test.go | 1 + agent/config.go | 1 + cmd/run.go | 1 + config/config.go | 1 + config/config_test.go | 2 ++ config/meteor.yaml.sample | 3 ++- 7 files changed, 11 insertions(+), 4 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 3f3752a93..a89490716 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -17,8 +17,6 @@ import ( "github.com/raystack/salt/log" ) -const defaultBatchSize = 1 - // TimerFn of function type type TimerFn func() func() int @@ -32,6 +30,7 @@ type Agent struct { retrier *retrier stopOnSinkError bool timerFn TimerFn + sinkBatchSize int } // NewAgent returns an Agent with plugin factories. @@ -53,6 +52,7 @@ func NewAgent(config Config) *Agent { logger: config.Logger, retrier: retrier, timerFn: timerFn, + sinkBatchSize: config.SinkBatchSize, } } @@ -313,7 +313,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipeName) return nil - }, defaultBatchSize) + }, r.sinkBatchSize) stream.onClose(func() { if err := sink.Close(); err != nil { diff --git a/agent/agent_test.go b/agent/agent_test.go index 3f30a6833..f77821b90 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -772,6 +772,7 @@ func TestAgentRun(t *testing.T) { Monitor: monitor, MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time + SinkBatchSize: 1, }) run := r.Run(ctx, validRecipe) assert.NoError(t, run.Error) diff --git a/agent/config.go b/agent/config.go index 879b64e0e..787c582d1 100644 --- a/agent/config.go +++ b/agent/config.go @@ -17,4 +17,5 @@ type Config struct { RetryInitialInterval time.Duration StopOnSinkError bool TimerFn TimerFn + SinkBatchSize int } diff --git a/cmd/run.go b/cmd/run.go index b61087249..6378cee95 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -91,6 +91,7 @@ func RunCmd() *cobra.Command { MaxRetries: cfg.MaxRetries, RetryInitialInterval: time.Duration(cfg.RetryInitialIntervalSeconds) * time.Second, StopOnSinkError: cfg.StopOnSinkError, + SinkBatchSize: cfg.SinkBatchSize, }) recipes, err := recipe.NewReader(lg, pathToConfig).Read(args[0]) diff --git a/config/config.go b/config/config.go index cc2009e70..b9814239e 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ type Config struct { OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"` OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"` OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"` + SinkBatchSize int `mapstructure:"SINK_BATCH_SIZE" default:"1"` } func Load(configFile string) (Config, error) { diff --git a/config/config_test.go b/config/config_test.go index d28afe9cc..90b6a0c8f 100755 --- a/config/config_test.go +++ b/config/config_test.go @@ -31,6 +31,7 @@ func TestLoad(t *testing.T) { MaxRetries: 5, RetryInitialIntervalSeconds: 5, StopOnSinkError: false, + SinkBatchSize: 1, }, }, { @@ -46,6 +47,7 @@ func TestLoad(t *testing.T) { OtelTraceSampleProbability: 1, MaxRetries: 5, RetryInitialIntervalSeconds: 5, + SinkBatchSize: 1, }, expectedErr: "", }, diff --git a/config/meteor.yaml.sample b/config/meteor.yaml.sample index f660860ba..f811df6f4 100644 --- a/config/meteor.yaml.sample +++ b/config/meteor.yaml.sample @@ -5,4 +5,5 @@ STOP_ON_SINK_ERROR: false APP_NAME: meteor OTEL_ENABLED: false OTEL_COLLECTOR_ADDR: "localhost:4317" -OTEL_TRACE_SAMPLE_PROBABILITY: 1 \ No newline at end of file +OTEL_TRACE_SAMPLE_PROBABILITY: 1 +SINK_BATCH_SIZE: 10 \ No newline at end of file