Skip to content

Commit

Permalink
Add smoke test endpoint (#3368)
Browse files Browse the repository at this point in the history
* Add smoke test endpoint

* Make stream url and duration request params

* retries
  • Loading branch information
mjh1 authored Jan 28, 2025
1 parent 3f3aef3 commit 51e6cbe
Showing 1 changed file with 86 additions and 0 deletions.
86 changes: 86 additions & 0 deletions server/ai_mediaserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"log/slog"
"net/http"
"net/url"
"os/exec"
"strings"
"time"

"github.com/livepeer/go-livepeer/monitor"

"github.com/cenkalti/backoff"
"github.com/getkin/kin-openapi/openapi3filter"
"github.com/livepeer/ai-worker/worker"
"github.com/livepeer/go-livepeer/clog"
Expand Down Expand Up @@ -86,6 +88,7 @@ func startAIMediaServer(ctx context.Context, ls *LivepeerServer) error {
ls.HTTPMux.Handle("/live/video-to-video/{stream}/start", ls.StartLiveVideo())
ls.HTTPMux.Handle("/live/video-to-video/{prefix}/{stream}/start", ls.StartLiveVideo())
ls.HTTPMux.Handle("/live/video-to-video/{stream}/update", ls.UpdateLiveVideo())
ls.HTTPMux.Handle("/live/video-to-video/smoketest", ls.SmokeTestLiveVideo())

// Stream status
ls.HTTPMux.Handle("/live/video-to-video/{streamId}/status", ls.GetLiveVideoToVideoStatus())
Expand Down Expand Up @@ -652,3 +655,86 @@ func (ls *LivepeerServer) cleanupLive(stream string) {
pub.StopControl()
}
}

const defaultSmokeTestDuration = 5 * time.Minute
const maxSmokeTestDuration = 60 * time.Minute

type smokeTestRequest struct {
StreamURL string `json:"stream_url"`
DurationSecs int `json:"duration_secs"`
}

func (ls *LivepeerServer) SmokeTestLiveVideo() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPut {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

var req smokeTestRequest
defer r.Body.Close()
d := json.NewDecoder(r.Body)
err := d.Decode(&req)
if err != nil {
http.Error(w, "Failed to parse request body", http.StatusBadRequest)
return
}
if req.StreamURL == "" {
http.Error(w, "Missing stream url", http.StatusBadRequest)
return
}

ingestURL := req.StreamURL
duration := defaultSmokeTestDuration
if req.DurationSecs != 0 {
if float64(req.DurationSecs) > maxSmokeTestDuration.Seconds() {
http.Error(w, "Request exceeds max duration "+maxSmokeTestDuration.String(), http.StatusBadRequest)
return
}
duration = time.Duration(req.DurationSecs) * time.Second
}
// Use an FFMPEG test card
var params = []string{
"-re",
"-f", "lavfi",
"-i", "testsrc=size=1920x1080:rate=30,format=yuv420p",
"-f", "lavfi",
"-i", "sine",
"-c:v", "libx264",
"-b:v", "1000k",
"-x264-params", "keyint=60",
"-c:a", "aac",
"-to", fmt.Sprintf("%f", duration.Seconds()),
"-f", "flv",
ingestURL,
}

ctx, cancel := context.WithTimeout(context.Background(), duration+time.Minute)
cmd := exec.CommandContext(ctx, "ffmpeg", params...)
var outputBuf bytes.Buffer
cmd.Stdout = &outputBuf
cmd.Stderr = &outputBuf

clog.Infof(ctx, "Starting smoke test for %s duration %s", ingestURL, duration)

if err := cmd.Start(); err != nil {
cancel()
clog.Errorf(ctx, "failed to start ffmpeg. Error: %s\nCommand: ffmpeg %s", err, strings.Join(params, " "))
http.Error(w, "Failed to start stream", http.StatusInternalServerError)
return
}

go func() {
defer cancel()
_ = backoff.Retry(func() error {
if state, err := cmd.Process.Wait(); err != nil || state.ExitCode() != 0 {
clog.Errorf(ctx, "failed to run ffmpeg. Exit Code: %d, Error: %s\nCommand: ffmpeg %s\n", state.ExitCode(), err, strings.Join(params, " "))
clog.Errorf(ctx, "ffmpeg output:\n%s\n", outputBuf.String())
return fmt.Errorf("ffmpeg failed")
}
clog.Infof(ctx, "Smoke test finished successfully for %s", ingestURL)
return nil
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 3))
}()
})
}

0 comments on commit 51e6cbe

Please sign in to comment.