diff --git a/pkg/spark/spark.go b/pkg/spark/spark.go index ba9b16d..0a2dff1 100644 --- a/pkg/spark/spark.go +++ b/pkg/spark/spark.go @@ -120,6 +120,16 @@ func (s *Spark) submitArgs(presetName string) ([]string, error) { return args, nil } +var submitCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "spark_exec_total", + Help: "The total number of spark-submit runs", +}, []string{"preset", "status"}) + +var retryCounter = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "retry_total", + Help: "The total number of retries", +}, []string{"preset"}) + func (s *Spark) Submit(presetName string) error { args, err := s.submitArgs(presetName) if err != nil { @@ -127,7 +137,26 @@ func (s *Spark) Submit(presetName string) error { } zap.L().Info("submit with args", zap.Any("args", args)) - go s.exec("submit", args) + cmd := exec.Command(s.binaryPath, args...) + zap.L().Info("spark-submit", zap.Strings("args", args)) + if s.debug { + writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel} + cmd.Stderr = writer + cmd.Stdout = writer + defer writer.Close() + } + + go func() { + if err := retry(10, 1*time.Second, 2, 5*time.Minute, func() error { + retryCounter.WithLabelValues(presetName).Inc() + return cmd.Run() + }); err != nil { + zap.L().Error("spark submit failed with retries", zap.Error(err)) + submitCounter.WithLabelValues(presetName, "failure").Inc() + } + submitCounter.WithLabelValues(presetName, "success").Inc() + }() + return nil } @@ -139,8 +168,19 @@ func (s *Spark) buildArgs(kind string, namespace, name string) []string { } func (s *Spark) Kill(namespace, name string) { - kind := "kill" - s.exec(kind, s.buildArgs(kind, namespace, name)) + args := s.buildArgs("kill", namespace, name) + cmd := exec.Command(s.binaryPath, args...) + zap.L().Info("spark-submit", zap.Strings("args", args)) + if s.debug { + writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel} + cmd.Stderr = writer + cmd.Stdout = writer + defer writer.Close() + } + + if err := cmd.Run(); err != nil { + zap.L().Error("killing spark app failed", zap.Error(err)) + } } func (s *Spark) Status(namespace, name string) string { @@ -157,35 +197,6 @@ func (s *Spark) Status(namespace, name string) string { return buffer.String() } -var execMetrics = promauto.NewCounterVec(prometheus.CounterOpts{ - Name: "spark_exec_total", - Help: "The total number of spark-submit runs", -}, []string{"kind", "status"}) - -func (s *Spark) exec(kind string, args []string) { - cmd := exec.Command(s.binaryPath, args...) - zap.L().Info("spark-submit", zap.Strings("args", args)) - if s.debug { - writer := &zapio.Writer{Log: zap.L(), Level: zap.DebugLevel} - cmd.Stderr = writer - cmd.Stdout = writer - defer writer.Close() - } - - if err := retry(10, 1*time.Second, 2, 5*time.Minute, func() error { - return cmd.Run() - }); err != nil { - zap.L().Error("spark submit failed with retries", zap.Error(err)) - execMetrics.WithLabelValues("failure").Inc() - } - execMetrics.WithLabelValues("success").Inc() -} - -var retryMetrics = promauto.NewCounter(prometheus.CounterOpts{ - Name: "retry_total", - Help: "The total number of retries", -}) - func retry(retries int, initialDelay time.Duration, mult int, maxWait time.Duration, fn func() error) error { delay := initialDelay for try := 0; try < retries; try++ { @@ -197,7 +208,6 @@ func retry(retries int, initialDelay time.Duration, mult int, maxWait time.Durat zap.Int("try", try), zap.String("waitDuration", delay.String()), ) - retryMetrics.Inc() } time.Sleep(delay) delay = delay * time.Duration(mult)