From ffa7ecc5bd7f7d40cf5b5125634bdecb1b336891 Mon Sep 17 00:00:00 2001 From: rmil Date: Mon, 9 Aug 2021 23:27:46 +0100 Subject: [PATCH] Working VOD --- controllers/v1/encoder/encoder.go | 20 +++++++++++++++++++- main.go | 4 +++- routes/router.go | 10 ++++++---- services/encoder/encode.go | 28 ++++++++++++++++------------ services/encoder/manager.go | 5 ++++- services/encoder/worker.go | 22 ++++++++++++++++++++++ 6 files changed, 70 insertions(+), 19 deletions(-) create mode 100644 services/encoder/worker.go diff --git a/controllers/v1/encoder/encoder.go b/controllers/v1/encoder/encoder.go index c7cc657..6b65a11 100644 --- a/controllers/v1/encoder/encoder.go +++ b/controllers/v1/encoder/encoder.go @@ -5,9 +5,14 @@ import ( "net/http" "github.com/labstack/echo/v4" + "github.com/ystv/web-api/services/encoder" "github.com/ystv/web-api/utils" ) +type EncoderController struct { + enc *encoder.Encoder +} + type ( // These structs are for binding to tusd's request @@ -45,7 +50,7 @@ type ( // Connects with tusd through web-hooks, so tusd POSTs here. // tusd's requests here does contain a lot of useful information. // but for this endpoint, we are just checking for the JWT. -func VideoNew(c echo.Context) error { +func (e *EncoderController) VideoNew(c echo.Context) error { r := Request{} c.Bind(&r) if r.HTTPRequest.Method != "POST" { @@ -60,3 +65,16 @@ func VideoNew(c echo.Context) error { return c.NoContent(http.StatusOK) } + +func (e *EncoderController) TranscodeFinished(c echo.Context) error { + err := e.enc.TranscodeFinished(c.Request().Context(), c.Param("taskid")) + if err != nil { + err = fmt.Errorf("transcode finished failed: %w", err) + return echo.NewHTTPError(http.StatusInternalServerError, err) + } + return c.NoContent(http.StatusOK) +} + +func NewEncoderController(enc *encoder.Encoder) *EncoderController { + return &EncoderController{enc: enc} +} diff --git a/main.go b/main.go index 92c3608..268c3c5 100644 --- a/main.go +++ b/main.go @@ -8,11 +8,13 @@ import ( "github.com/joho/godotenv" "github.com/ystv/web-api/controllers/v1/clapper" "github.com/ystv/web-api/controllers/v1/creator" + encoderPackage "github.com/ystv/web-api/controllers/v1/encoder" "github.com/ystv/web-api/controllers/v1/misc" "github.com/ystv/web-api/controllers/v1/people" "github.com/ystv/web-api/controllers/v1/public" "github.com/ystv/web-api/routes" "github.com/ystv/web-api/services/encoder" + "github.com/ystv/web-api/utils" ) @@ -130,7 +132,7 @@ func main() { JWTSigningKey: os.Getenv("WAPI_SIGNING_KEY"), Clapper: clapper.NewRepos(db), Creator: creator.NewRepos(db, cdn, enc, creatorConfig), - Encoder: enc, + Encoder: encoderPackage.NewEncoderController(enc), Misc: misc.NewRepos(db), People: people.NewRepo(db), Public: public.NewRepos(db), diff --git a/routes/router.go b/routes/router.go index a49ce58..3492adf 100644 --- a/routes/router.go +++ b/routes/router.go @@ -9,14 +9,13 @@ import ( echoMw "github.com/labstack/echo/v4/middleware" clapperPackage "github.com/ystv/web-api/controllers/v1/clapper" creatorPackage "github.com/ystv/web-api/controllers/v1/creator" - encoderV1 "github.com/ystv/web-api/controllers/v1/encoder" + encoderPackage "github.com/ystv/web-api/controllers/v1/encoder" miscPackage "github.com/ystv/web-api/controllers/v1/misc" peoplePackage "github.com/ystv/web-api/controllers/v1/people" publicPackage "github.com/ystv/web-api/controllers/v1/public" streamV1 "github.com/ystv/web-api/controllers/v1/stream" _ "github.com/ystv/web-api/docs" // docs is generated by Swag CLI, you have to import it. "github.com/ystv/web-api/middleware" - "github.com/ystv/web-api/services/encoder" "github.com/ystv/web-api/utils" echoSwagger "github.com/swaggo/echo-swagger" @@ -32,6 +31,7 @@ type Router struct { jwtConfig *echoMw.JWTConfig clapper *clapperPackage.Repos creator *creatorPackage.Repos + encoder *encoderPackage.EncoderController misc *miscPackage.Repos people *peoplePackage.Repo public *publicPackage.Repos @@ -46,7 +46,7 @@ type NewRouter struct { Debug bool Clapper *clapperPackage.Repos Creator *creatorPackage.Repos - Encoder *encoder.Encoder + Encoder *encoderPackage.EncoderController Misc *miscPackage.Repos People *peoplePackage.Repo Public *publicPackage.Repos @@ -65,6 +65,7 @@ func New(conf *NewRouter) *Router { }, clapper: conf.Clapper, creator: conf.Creator, + encoder: conf.Encoder, misc: conf.Misc, people: conf.People, public: conf.Public, @@ -113,7 +114,8 @@ func (r *Router) loadRoutes() { // Service web endpoints encoder := internal.Group("/encoder") { - encoder.POST("/upload_request", encoderV1.VideoNew) + encoder.POST("/upload_request", r.encoder.VideoNew) + encoder.POST("/transcode_finished/:taskid", r.encoder.TranscodeFinished) } stream := internal.Group("/stream") { diff --git a/services/encoder/encode.go b/services/encoder/encode.go index 1cae55f..e0d109a 100644 --- a/services/encoder/encode.go +++ b/services/encoder/encode.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "log" "net/http" "path/filepath" "strings" @@ -36,8 +35,13 @@ func (e *Encoder) getVideoFilesAndPreset(ctx context.Context, videoID int) (Vide return v, nil } +type EncodeResult struct { + URI string + JobID string +} + // CreateEncode creates an encode item in the message queue. -func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int) error { +func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int) (EncodeResult, error) { // Check video exists // Validate encode format // Send the job to VT @@ -53,7 +57,7 @@ func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int }) if err != nil { - return fmt.Errorf("failed to get object: %w", err) + return EncodeResult{}, fmt.Errorf("failed to get object: %w", err) } format := EncodeFormat{} @@ -62,7 +66,7 @@ func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int FROM video.encode_formats WHERE format_id = $1`, formatID) if format.Arguments == "" { - return ErrNoArgs + return EncodeResult{}, ErrNoArgs } if format.FileSuffix == "" { format.FileSuffix = fmt.Sprint(formatID) @@ -75,7 +79,8 @@ func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int extension := filepath.Ext(key) keyWithoutExtension := strings.TrimSuffix(key, extension) - dstURL := e.conf.ServeBucket + keyWithoutExtension + "_" + format.FileSuffix + extension + // Setting the name of the transcoded file + dstURL := fmt.Sprintf("%s/%s_%s%s", e.conf.ServeBucket, keyWithoutExtension, format.FileSuffix, extension) taskVOD := struct { SrcURL string `json:"srcURL"` @@ -87,28 +92,27 @@ func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int reqJSON, err := json.Marshal(taskVOD) if err != nil { - return fmt.Errorf("failed to marshal json: %w", err) + return EncodeResult{}, fmt.Errorf("failed to marshal json: %w", err) } res, err := e.c.Post(e.conf.VTEndpoint+"/task/video/vod", "application/json", bytes.NewReader(reqJSON)) if err != nil { - return fmt.Errorf("failed to post to vt: %w", err) + return EncodeResult{}, fmt.Errorf("failed to post to vt: %w", err) } defer res.Body.Close() switch status := res.StatusCode; { case status == http.StatusCreated: case status == http.StatusUnauthorized: - return ErrVTFailedToAuthenticate + return EncodeResult{}, ErrVTFailedToAuthenticate default: - return ErrVTUnknownResponse + return EncodeResult{}, ErrVTUnknownResponse } dec := json.NewDecoder(res.Body) task := TaskIdentification{} err = dec.Decode(&task) if err != nil { - return fmt.Errorf("failed to decode vt task response: %w", err) + return EncodeResult{}, fmt.Errorf("failed to decode vt task response: %w", err) } - log.Printf("%+v", task) - return nil + return EncodeResult{URI: dstURL, JobID: task.TaskID}, nil } diff --git a/services/encoder/manager.go b/services/encoder/manager.go index ef3b4c5..f299785 100644 --- a/services/encoder/manager.go +++ b/services/encoder/manager.go @@ -52,10 +52,13 @@ func (e *Encoder) RefreshVideo(ctx context.Context, videoID int) error { return ErrNoFormats } for _, format := range p.Formats { - err = e.CreateEncode(ctx, v.Files[srcFileIdx], format.FormatID) + res, err := e.CreateEncode(ctx, v.Files[srcFileIdx], format.FormatID) if err != nil { return fmt.Errorf("failed to create encode fileID=%d format=%d : %w", v.Files[srcFileIdx].FileID, format.FormatID, err) } + e.db.ExecContext(ctx, ` + INSERT INTO video.files(video_id, format_id, uri, status) + VALUES ($1, $2, $3, $4);`, videoID, format.FormatID, res.URI, "processing/"+res.JobID) } return nil } diff --git a/services/encoder/worker.go b/services/encoder/worker.go new file mode 100644 index 0000000..ea448a3 --- /dev/null +++ b/services/encoder/worker.go @@ -0,0 +1,22 @@ +package encoder + +import ( + "context" + "fmt" +) + +func (e *Encoder) TranscodeFinished(ctx context.Context, taskID string) error { + fileID := 0 + err := e.db.GetContext(ctx, &fileID, ` + SELECT file_id + FROM video.files + WHERE status = $1;`, fmt.Sprintf("processing/%s", taskID)) + if err != nil { + return fmt.Errorf("failed to get video files: %w", err) + } + _, err = e.db.ExecContext(ctx, `UPDATE video.files SET status = 'public' WHERE file_id = $1;`, fileID) + if err != nil { + return fmt.Errorf("failed to update video file") + } + return nil +}