Skip to content

Commit

Permalink
feat/ffmpeg execute ffmpeg through temporal
Browse files Browse the repository at this point in the history
  • Loading branch information
fredrikvedvik committed Sep 11, 2023
1 parent 733d72c commit 20b5880
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 0 deletions.
20 changes: 20 additions & 0 deletions activities/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/bcc-code/bccm-flows/common"
"github.com/bcc-code/bccm-flows/services/ffmpeg"
"github.com/bcc-code/bccm-flows/services/transcode"
"go.temporal.io/sdk/activity"
)
Expand Down Expand Up @@ -182,3 +183,22 @@ func TranscodeMux(ctx context.Context, input common.MuxInput) (*common.MuxResult
}
return result, nil
}

type ExecuteFFmpegInput struct {
Arguments []string
}

func ExecuteFFmpeg(ctx context.Context, input ExecuteFFmpegInput) error {
log := activity.GetLogger(ctx)
activity.RecordHeartbeat(ctx, "ExecuteFFmpeg")
log.Info("Starting ExecuteFFmpeg")

stopChan, progressCallback := registerProgressCallback(ctx)
defer close(stopChan)

_, err := ffmpeg.Do(input.Arguments, ffmpeg.StreamInfo{}, progressCallback)
if err != nil {
return err
}
return nil
}
11 changes: 11 additions & 0 deletions cmd/httpin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,17 @@ func triggerHandler(ctx *gin.Context) {
VXID: vxID,
WithFiles: getParamFromCtx(ctx, "withFiles") == "true",
})
case "ExecuteFFmpeg":
var input struct {
Arguments []string `json:"arguments"`
}
if err = ctx.BindJSON(&input); err != nil {
ctx.Status(http.StatusBadRequest)
return
}
res, err = wfClient.ExecuteWorkflow(ctx, workflowOptions, workflows.ExecuteFFmpeg, workflows.ExecuteFFmpegInput{
Arguments: input.Arguments,
})
}

if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var transcodeActivities = []any{
activities.TranscodeToVideoH264,
activities.TranscodeToAudioAac,
activities.TranscodeMux,
activities.ExecuteFFmpeg,
}

var workerWorkflows = []any{
Expand All @@ -53,6 +54,7 @@ var workerWorkflows = []any{
workflows.MergeExportData,
workflows.MuxFiles,
workflows.PrepareFiles,
workflows.ExecuteFFmpeg,
}

func main() {
Expand Down
45 changes: 45 additions & 0 deletions workflows/ffmpeg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package workflows

import (
"github.com/bcc-code/bccm-flows/activities"
"github.com/bcc-code/bccm-flows/utils"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"time"
)

type ExecuteFFmpegInput struct {
Arguments []string
}

func ExecuteFFmpeg(
ctx workflow.Context,
params ExecuteFFmpegInput,
) error {
logger := workflow.GetLogger(ctx)
options := workflow.ActivityOptions{
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Minute * 1,
MaximumAttempts: 10,
MaximumInterval: time.Hour * 1,
},
StartToCloseTimeout: time.Hour * 4,
ScheduleToCloseTimeout: time.Hour * 48,
HeartbeatTimeout: time.Minute * 1,
TaskQueue: utils.GetTranscodeQueue(),
}

ctx = workflow.WithActivityOptions(ctx, options)

logger.Info("Starting ExecuteFFmpeg")

err := workflow.ExecuteActivity(ctx, activities.ExecuteFFmpeg, activities.ExecuteFFmpegInput{
Arguments: params.Arguments,
}).Get(ctx, nil)

if err != nil {
return err
}

return err
}

0 comments on commit 20b5880

Please sign in to comment.