Skip to content

Commit

Permalink
Working VOD
Browse files Browse the repository at this point in the history
  • Loading branch information
rmil committed Aug 9, 2021
1 parent 6a4c3b7 commit ffa7ecc
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 19 deletions.
20 changes: 19 additions & 1 deletion controllers/v1/encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ import (
"net/http"

"github.com/labstack/echo/v4"
"github.com/ystv/web-api/services/encoder"
"github.com/ystv/web-api/utils"
)

type EncoderController struct {
enc *encoder.Encoder
}

type (
// These structs are for binding to tusd's request

Expand Down Expand Up @@ -45,7 +50,7 @@ type (
// Connects with tusd through web-hooks, so tusd POSTs here.
// tusd's requests here does contain a lot of useful information.
// but for this endpoint, we are just checking for the JWT.
func VideoNew(c echo.Context) error {
func (e *EncoderController) VideoNew(c echo.Context) error {
r := Request{}
c.Bind(&r)
if r.HTTPRequest.Method != "POST" {
Expand All @@ -60,3 +65,16 @@ func VideoNew(c echo.Context) error {

return c.NoContent(http.StatusOK)
}

func (e *EncoderController) TranscodeFinished(c echo.Context) error {
err := e.enc.TranscodeFinished(c.Request().Context(), c.Param("taskid"))
if err != nil {
err = fmt.Errorf("transcode finished failed: %w", err)
return echo.NewHTTPError(http.StatusInternalServerError, err)
}
return c.NoContent(http.StatusOK)
}

func NewEncoderController(enc *encoder.Encoder) *EncoderController {
return &EncoderController{enc: enc}
}
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"github.com/joho/godotenv"
"github.com/ystv/web-api/controllers/v1/clapper"
"github.com/ystv/web-api/controllers/v1/creator"
encoderPackage "github.com/ystv/web-api/controllers/v1/encoder"
"github.com/ystv/web-api/controllers/v1/misc"
"github.com/ystv/web-api/controllers/v1/people"
"github.com/ystv/web-api/controllers/v1/public"
"github.com/ystv/web-api/routes"
"github.com/ystv/web-api/services/encoder"

"github.com/ystv/web-api/utils"
)

Expand Down Expand Up @@ -130,7 +132,7 @@ func main() {
JWTSigningKey: os.Getenv("WAPI_SIGNING_KEY"),
Clapper: clapper.NewRepos(db),
Creator: creator.NewRepos(db, cdn, enc, creatorConfig),
Encoder: enc,
Encoder: encoderPackage.NewEncoderController(enc),
Misc: misc.NewRepos(db),
People: people.NewRepo(db),
Public: public.NewRepos(db),
Expand Down
10 changes: 6 additions & 4 deletions routes/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
echoMw "github.com/labstack/echo/v4/middleware"
clapperPackage "github.com/ystv/web-api/controllers/v1/clapper"
creatorPackage "github.com/ystv/web-api/controllers/v1/creator"
encoderV1 "github.com/ystv/web-api/controllers/v1/encoder"
encoderPackage "github.com/ystv/web-api/controllers/v1/encoder"
miscPackage "github.com/ystv/web-api/controllers/v1/misc"
peoplePackage "github.com/ystv/web-api/controllers/v1/people"
publicPackage "github.com/ystv/web-api/controllers/v1/public"
streamV1 "github.com/ystv/web-api/controllers/v1/stream"
_ "github.com/ystv/web-api/docs" // docs is generated by Swag CLI, you have to import it.
"github.com/ystv/web-api/middleware"
"github.com/ystv/web-api/services/encoder"
"github.com/ystv/web-api/utils"

echoSwagger "github.com/swaggo/echo-swagger"
Expand All @@ -32,6 +31,7 @@ type Router struct {
jwtConfig *echoMw.JWTConfig
clapper *clapperPackage.Repos
creator *creatorPackage.Repos
encoder *encoderPackage.EncoderController
misc *miscPackage.Repos
people *peoplePackage.Repo
public *publicPackage.Repos
Expand All @@ -46,7 +46,7 @@ type NewRouter struct {
Debug bool
Clapper *clapperPackage.Repos
Creator *creatorPackage.Repos
Encoder *encoder.Encoder
Encoder *encoderPackage.EncoderController
Misc *miscPackage.Repos
People *peoplePackage.Repo
Public *publicPackage.Repos
Expand All @@ -65,6 +65,7 @@ func New(conf *NewRouter) *Router {
},
clapper: conf.Clapper,
creator: conf.Creator,
encoder: conf.Encoder,
misc: conf.Misc,
people: conf.People,
public: conf.Public,
Expand Down Expand Up @@ -113,7 +114,8 @@ func (r *Router) loadRoutes() {
// Service web endpoints
encoder := internal.Group("/encoder")
{
encoder.POST("/upload_request", encoderV1.VideoNew)
encoder.POST("/upload_request", r.encoder.VideoNew)
encoder.POST("/transcode_finished/:taskid", r.encoder.TranscodeFinished)
}
stream := internal.Group("/stream")
{
Expand Down
28 changes: 16 additions & 12 deletions services/encoder/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"path/filepath"
"strings"
Expand Down Expand Up @@ -36,8 +35,13 @@ func (e *Encoder) getVideoFilesAndPreset(ctx context.Context, videoID int) (Vide
return v, nil
}

type EncodeResult struct {
URI string
JobID string
}

// CreateEncode creates an encode item in the message queue.
func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int) error {
func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int) (EncodeResult, error) {
// Check video exists
// Validate encode format
// Send the job to VT
Expand All @@ -53,7 +57,7 @@ func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int
})

if err != nil {
return fmt.Errorf("failed to get object: %w", err)
return EncodeResult{}, fmt.Errorf("failed to get object: %w", err)
}

format := EncodeFormat{}
Expand All @@ -62,7 +66,7 @@ func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int
FROM video.encode_formats
WHERE format_id = $1`, formatID)
if format.Arguments == "" {
return ErrNoArgs
return EncodeResult{}, ErrNoArgs
}
if format.FileSuffix == "" {
format.FileSuffix = fmt.Sprint(formatID)
Expand All @@ -75,7 +79,8 @@ func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int
extension := filepath.Ext(key)
keyWithoutExtension := strings.TrimSuffix(key, extension)

dstURL := e.conf.ServeBucket + keyWithoutExtension + "_" + format.FileSuffix + extension
// Setting the name of the transcoded file
dstURL := fmt.Sprintf("%s/%s_%s%s", e.conf.ServeBucket, keyWithoutExtension, format.FileSuffix, extension)

taskVOD := struct {
SrcURL string `json:"srcURL"`
Expand All @@ -87,28 +92,27 @@ func (e *Encoder) CreateEncode(ctx context.Context, file VideoFile, formatID int

reqJSON, err := json.Marshal(taskVOD)
if err != nil {
return fmt.Errorf("failed to marshal json: %w", err)
return EncodeResult{}, fmt.Errorf("failed to marshal json: %w", err)
}

res, err := e.c.Post(e.conf.VTEndpoint+"/task/video/vod", "application/json", bytes.NewReader(reqJSON))
if err != nil {
return fmt.Errorf("failed to post to vt: %w", err)
return EncodeResult{}, fmt.Errorf("failed to post to vt: %w", err)
}

defer res.Body.Close()
switch status := res.StatusCode; {
case status == http.StatusCreated:
case status == http.StatusUnauthorized:
return ErrVTFailedToAuthenticate
return EncodeResult{}, ErrVTFailedToAuthenticate
default:
return ErrVTUnknownResponse
return EncodeResult{}, ErrVTUnknownResponse
}
dec := json.NewDecoder(res.Body)
task := TaskIdentification{}
err = dec.Decode(&task)
if err != nil {
return fmt.Errorf("failed to decode vt task response: %w", err)
return EncodeResult{}, fmt.Errorf("failed to decode vt task response: %w", err)
}
log.Printf("%+v", task)
return nil
return EncodeResult{URI: dstURL, JobID: task.TaskID}, nil
}
5 changes: 4 additions & 1 deletion services/encoder/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ func (e *Encoder) RefreshVideo(ctx context.Context, videoID int) error {
return ErrNoFormats
}
for _, format := range p.Formats {
err = e.CreateEncode(ctx, v.Files[srcFileIdx], format.FormatID)
res, err := e.CreateEncode(ctx, v.Files[srcFileIdx], format.FormatID)
if err != nil {
return fmt.Errorf("failed to create encode fileID=%d format=%d : %w", v.Files[srcFileIdx].FileID, format.FormatID, err)
}
e.db.ExecContext(ctx, `
INSERT INTO video.files(video_id, format_id, uri, status)
VALUES ($1, $2, $3, $4);`, videoID, format.FormatID, res.URI, "processing/"+res.JobID)
}
return nil
}
Expand Down
22 changes: 22 additions & 0 deletions services/encoder/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package encoder

import (
"context"
"fmt"
)

func (e *Encoder) TranscodeFinished(ctx context.Context, taskID string) error {
fileID := 0
err := e.db.GetContext(ctx, &fileID, `
SELECT file_id
FROM video.files
WHERE status = $1;`, fmt.Sprintf("processing/%s", taskID))
if err != nil {
return fmt.Errorf("failed to get video files: %w", err)
}
_, err = e.db.ExecContext(ctx, `UPDATE video.files SET status = 'public' WHERE file_id = $1;`, fileID)
if err != nil {
return fmt.Errorf("failed to update video file")
}
return nil
}

0 comments on commit ffa7ecc

Please sign in to comment.