Skip to content

Commit

Permalink
Merge pull request #140 from bcc-code/feat/queue-based-retry-policy
Browse files Browse the repository at this point in the history
feat: queue based retry policies.
  • Loading branch information
fredrikvedvik authored Jan 11, 2024
2 parents 7c35c1d + 9d69400 commit dd61c14
Show file tree
Hide file tree
Showing 28 changed files with 117 additions and 264 deletions.
13 changes: 0 additions & 13 deletions utils/workflows/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/bcc-code/bcc-media-flows/environment"

"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

Expand All @@ -16,21 +15,9 @@ type ResultOrError[T any] struct {

func GetDefaultActivityOptions() workflow.ActivityOptions {
return workflow.ActivityOptions{
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Minute * 1,
MaximumAttempts: 10,
MaximumInterval: time.Hour * 1,
},
StartToCloseTimeout: time.Hour * 4,
ScheduleToCloseTimeout: time.Hour * 48,
HeartbeatTimeout: time.Minute * 1,
TaskQueue: environment.GetWorkerQueue(),
}
}

func GetDefaultWorkflowOptions() workflow.ChildWorkflowOptions {
return workflow.ChildWorkflowOptions{
TaskQueue: environment.GetWorkerQueue(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion utils/workflows/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ func PublishEvent[T any](ctx workflow.Context, eventName string, data T) error {
return err
}

return workflow.ExecuteActivity(ctx, activities.PubsubPublish, event).Get(ctx, nil)
return ExecuteWithQueue(ctx, activities.PubsubPublish, event).Get(ctx, nil)
}
25 changes: 25 additions & 0 deletions utils/workflows/execute.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,39 @@
package wfutils

import (
"time"

"github.com/bcc-code/bcc-media-flows/activities"
"github.com/bcc-code/bcc-media-flows/environment"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

// ExecuteWithQueue executes the specified activity with the correct task queue
func ExecuteWithQueue(ctx workflow.Context, activity any, params ...any) workflow.Future {
options := workflow.GetActivityOptions(ctx)
options.TaskQueue = activities.GetQueueForActivity(activity)

switch options.TaskQueue {
case environment.GetWorkerQueue():
if options.RetryPolicy == nil {
options.RetryPolicy = &temporal.RetryPolicy{
MaximumAttempts: 10,
InitialInterval: 30 * time.Second,
MaximumInterval: 60 * time.Minute,
}
}
// usual reason for this failing is invalid files or tweaks to ffmpeg commands
case environment.GetTranscodeQueue(), environment.GetAudioQueue():
if options.RetryPolicy == nil {
options.RetryPolicy = &temporal.RetryPolicy{
MaximumAttempts: 5,
InitialInterval: 30 * time.Second,
MaximumInterval: 30 * time.Second,
}
}
}

ctx = workflow.WithActivityOptions(ctx, options)
return workflow.ExecuteActivity(ctx, activity, params...)
}
18 changes: 9 additions & 9 deletions utils/workflows/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,28 @@ import (
)

func CreateFolder(ctx workflow.Context, destination paths.Path) error {
return workflow.ExecuteActivity(ctx, activities.CreateFolder, activities.CreateFolderInput{
return ExecuteWithQueue(ctx, activities.CreateFolder, activities.CreateFolderInput{
Destination: destination,
}).Get(ctx, nil)
}

func StandardizeFileName(ctx workflow.Context, file paths.Path) (paths.Path, error) {
var result activities.FileResult
err := workflow.ExecuteActivity(ctx, activities.StandardizeFileName, activities.FileInput{
err := ExecuteWithQueue(ctx, activities.StandardizeFileName, activities.FileInput{
Path: file,
}).Get(ctx, &result)
return result.Path, err
}

func MoveFile(ctx workflow.Context, source, destination paths.Path) error {
return workflow.ExecuteActivity(ctx, activities.MoveFile, activities.MoveFileInput{
return ExecuteWithQueue(ctx, activities.MoveFile, activities.MoveFileInput{
Source: source,
Destination: destination,
}).Get(ctx, nil)
}

func CopyFile(ctx workflow.Context, source, destination paths.Path) error {
return workflow.ExecuteActivity(ctx, activities.CopyFile, activities.MoveFileInput{
return ExecuteWithQueue(ctx, activities.CopyFile, activities.MoveFileInput{
Source: source,
Destination: destination,
}).Get(ctx, nil)
Expand All @@ -48,23 +48,23 @@ func MoveToFolder(ctx workflow.Context, file, folder paths.Path) (paths.Path, er
}

func WriteFile(ctx workflow.Context, file paths.Path, data []byte) error {
return workflow.ExecuteActivity(ctx, activities.WriteFile, activities.WriteFileInput{
return ExecuteWithQueue(ctx, activities.WriteFile, activities.WriteFileInput{
Path: file,
Data: data,
}).Get(ctx, nil)
}

func ReadFile(ctx workflow.Context, file paths.Path) ([]byte, error) {
var res []byte
err := workflow.ExecuteActivity(ctx, activities.ReadFile, activities.FileInput{
err := ExecuteWithQueue(ctx, activities.ReadFile, activities.FileInput{
Path: file,
}).Get(ctx, &res)
return res, err
}

func ListFiles(ctx workflow.Context, path paths.Path) (paths.Files, error) {
var res []paths.Path
err := workflow.ExecuteActivity(ctx, activities.ListFiles, activities.FileInput{
err := ExecuteWithQueue(ctx, activities.ListFiles, activities.FileInput{
Path: path,
}).Get(ctx, &res)
return res, err
Expand All @@ -81,13 +81,13 @@ func UnmarshalXMLFile[T any](ctx workflow.Context, file paths.Path) (*T, error)
}

func DeletePath(ctx workflow.Context, path paths.Path) error {
return workflow.ExecuteActivity(ctx, activities.DeletePath, activities.DeletePathInput{
return ExecuteWithQueue(ctx, activities.DeletePath, activities.DeletePathInput{
Path: path,
}).Get(ctx, nil)
}

func DeletePathRecursively(ctx workflow.Context, path paths.Path) error {
return workflow.ExecuteActivity(ctx, activities.DeletePath, activities.DeletePathInput{
return ExecuteWithQueue(ctx, activities.DeletePath, activities.DeletePathInput{
RemoveAll: true,
Path: path,
}).Get(ctx, nil)
Expand Down
6 changes: 3 additions & 3 deletions utils/workflows/vidispine.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ func WaitForVidispineJob(ctx workflow.Context, jobID string) error {
NonRetryableErrorTypes: []string{"JOB_FAILED"},
}
ctx = workflow.WithActivityOptions(ctx, options)
return workflow.ExecuteActivity(ctx, vsactivity.JobCompleteOrErr, vsactivity.WaitForJobCompletionParams{
return ExecuteWithQueue(ctx, vsactivity.JobCompleteOrErr, vsactivity.WaitForJobCompletionParams{
JobID: jobID,
}).Get(ctx, nil)
}

func SetVidispineMeta(ctx workflow.Context, assetID, key, value string) error {
return workflow.ExecuteActivity(ctx, vsactivity.SetVXMetadataFieldActivity, vsactivity.SetVXMetadataFieldParams{
return ExecuteWithQueue(ctx, vsactivity.SetVXMetadataFieldActivity, vsactivity.SetVXMetadataFieldParams{
VXID: assetID,
Key: key,
Value: value,
}).Get(ctx, nil)
}

func AddVidispineMetaValue(ctx workflow.Context, assetID, key, value string) error {
return workflow.ExecuteActivity(ctx, vsactivity.AddVXMetadataFieldValueActivity, vsactivity.SetVXMetadataFieldParams{
return ExecuteWithQueue(ctx, vsactivity.AddVXMetadataFieldValueActivity, vsactivity.SetVXMetadataFieldParams{
VXID: assetID,
Key: key,
Value: value,
Expand Down
5 changes: 1 addition & 4 deletions workflows/export/merge_export_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package export
import (
"github.com/bcc-code/bcc-media-flows/activities"
"github.com/bcc-code/bcc-media-flows/common"
"github.com/bcc-code/bcc-media-flows/environment"
"github.com/bcc-code/bcc-media-flows/paths"
"github.com/bcc-code/bcc-media-flows/services/vidispine"
wfutils "github.com/bcc-code/bcc-media-flows/utils/workflows"
Expand Down Expand Up @@ -37,9 +36,7 @@ func MergeExportData(ctx workflow.Context, params MergeExportDataParams) (*Merge

mergeInput, audioMergeInputs, subtitleMergeInputs, jsonTranscriptFile := exportDataToMergeInputs(data, params.TempDir, params.SubtitlesDir)

options := wfutils.GetDefaultActivityOptions()
options.TaskQueue = environment.GetTranscodeQueue()
ctx = workflow.WithActivityOptions(ctx, options)
ctx = workflow.WithActivityOptions(ctx, wfutils.GetDefaultActivityOptions())

var transcriptTask workflow.Future
if params.MakeTranscript && jsonTranscriptFile != nil {
Expand Down
5 changes: 2 additions & 3 deletions workflows/export/vx_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func VXExport(ctx workflow.Context, params VXExportParams) ([]wfutils.ResultOrEr
logger := workflow.GetLogger(ctx)
logger.Info("Starting VXExport")

options := wfutils.GetDefaultActivityOptions()
ctx = workflow.WithActivityOptions(ctx, options)
ctx = workflow.WithActivityOptions(ctx, wfutils.GetDefaultActivityOptions())

var destinations []*AssetExportDestination
for _, dest := range params.Destinations {
Expand All @@ -90,7 +89,7 @@ func VXExport(ctx workflow.Context, params VXExportParams) ([]wfutils.ResultOrEr

var errs []error
var data *vidispine.ExportData
err := workflow.ExecuteActivity(ctx, avidispine.GetExportDataActivity, avidispine.GetExportDataParams{
err := wfutils.ExecuteWithQueue(ctx, avidispine.GetExportDataActivity, avidispine.GetExportDataParams{
VXID: params.VXID,
Languages: params.Languages,
AudioSource: params.AudioSource,
Expand Down
5 changes: 2 additions & 3 deletions workflows/export/vx_export_bmm.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ func VXExportToBMM(ctx workflow.Context, params VXExportChildWorkflowParams) (*V
logger := workflow.GetLogger(ctx)
logger.Info("Starting ExportToBMM")

options := wfutils.GetDefaultActivityOptions()
ctx = workflow.WithActivityOptions(ctx, options)
ctx = workflow.WithActivityOptions(ctx, wfutils.GetDefaultActivityOptions())

normalizedFutures := map[string]workflow.Future{}

Expand Down Expand Up @@ -168,7 +167,7 @@ func VXExportToBMM(ctx workflow.Context, params VXExportChildWorkflowParams) (*V
}

ingestFolder := params.ExportData.SafeTitle + "_" + workflow.GetInfo(ctx).OriginalRunID
err = workflow.ExecuteActivity(ctx, activities.RcloneCopyDir, activities.RcloneCopyDirInput{
err = wfutils.ExecuteWithQueue(ctx, activities.RcloneCopyDir, activities.RcloneCopyDirInput{
Source: params.OutputDir.Rclone(),
Destination: fmt.Sprintf("bmms3:/prod-bmm-mediabanken/" + ingestFolder),
}).Get(ctx, nil)
Expand Down
18 changes: 5 additions & 13 deletions workflows/export/vx_export_playout.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/bcc-code/bcc-media-flows/activities"
"github.com/bcc-code/bcc-media-flows/common"
"github.com/bcc-code/bcc-media-flows/environment"
wfutils "github.com/bcc-code/bcc-media-flows/utils/workflows"
"go.temporal.io/sdk/workflow"
)
Expand All @@ -14,21 +13,17 @@ func VXExportToPlayout(ctx workflow.Context, params VXExportChildWorkflowParams)
logger := workflow.GetLogger(ctx)
logger.Info("Starting ExportToPlayout")

options := wfutils.GetDefaultActivityOptions()
ctx = workflow.WithActivityOptions(ctx, options)
ctx = workflow.WithActivityOptions(ctx, wfutils.GetDefaultActivityOptions())

xdcamOutputDir := params.TempDir.Append("xdcam_output")
err := wfutils.CreateFolder(ctx, xdcamOutputDir)
if err != nil {
return nil, err
}

options.TaskQueue = environment.GetTranscodeQueue()
ctx = workflow.WithActivityOptions(ctx, options)

// Transcode video using playout encoding
var videoResult common.VideoResult
err = workflow.ExecuteActivity(ctx, activities.TranscodeToXDCAMActivity, activities.EncodeParams{
err = wfutils.ExecuteWithQueue(ctx, activities.TranscodeToXDCAMActivity, activities.EncodeParams{
Bitrate: "50M",
FilePath: *params.MergeResult.VideoFile,
OutputDir: xdcamOutputDir,
Expand All @@ -42,7 +37,7 @@ func VXExportToPlayout(ctx workflow.Context, params VXExportChildWorkflowParams)

// Mux into MXF file with 16 audio channels
var muxResult *common.PlayoutMuxResult
err = workflow.ExecuteActivity(ctx, activities.TranscodePlayoutMux, common.PlayoutMuxInput{
err = wfutils.ExecuteWithQueue(ctx, activities.TranscodePlayoutMux, common.PlayoutMuxInput{
VideoFilePath: videoResult.OutputPath,
AudioFilePaths: params.MergeResult.AudioFiles,
SubtitleFilePaths: params.MergeResult.SubtitleFiles,
Expand All @@ -53,20 +48,17 @@ func VXExportToPlayout(ctx workflow.Context, params VXExportChildWorkflowParams)
return nil, err
}

options.TaskQueue = environment.GetWorkerQueue()
ctx = workflow.WithActivityOptions(ctx, options)

// Rclone to playout
destination := "playout:/tmp"
err = workflow.ExecuteActivity(ctx, activities.RcloneCopyDir, activities.RcloneCopyDirInput{
err = wfutils.ExecuteWithQueue(ctx, activities.RcloneCopyDir, activities.RcloneCopyDirInput{
Source: params.OutputDir.Rclone(),
Destination: destination,
}).Get(ctx, nil)
if err != nil {
return nil, err
}

err = workflow.ExecuteActivity(ctx, activities.FtpPlayoutRename, activities.FtpPlayoutRenameParams{
err = wfutils.ExecuteWithQueue(ctx, activities.FtpPlayoutRename, activities.FtpPlayoutRenameParams{
From: filepath.Join("/tmp/", muxResult.Path.Base()),
To: filepath.Join("/dropbox/", muxResult.Path.Base()),
}).Get(ctx, nil)
Expand Down
11 changes: 3 additions & 8 deletions workflows/export/vx_export_vod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,19 @@ import (
"github.com/bcc-code/bcc-media-platform/backend/asset"
"github.com/bcc-code/bcc-media-platform/backend/events"
"github.com/samber/lo"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)

func VXExportToVOD(ctx workflow.Context, params VXExportChildWorkflowParams) (*VXExportResult, error) {
logger := workflow.GetLogger(ctx)
logger.Info("Starting ExportToVOD")

options := wfutils.GetDefaultActivityOptions()
options.RetryPolicy = &temporal.RetryPolicy{
MaximumAttempts: 1,
}
ctx = workflow.WithActivityOptions(ctx, options)
ctx = workflow.WithActivityOptions(ctx, wfutils.GetDefaultActivityOptions())

// We start chapter export and pick the results up later when needed
var chapterDataWF workflow.Future
if params.ParentParams.WithChapters {
chapterDataWF = workflow.ExecuteActivity(ctx, vsactivity.GetChapterDataActivity, vsactivity.GetChapterDataParams{
chapterDataWF = wfutils.ExecuteWithQueue(ctx, vsactivity.GetChapterDataActivity, vsactivity.GetChapterDataParams{
ExportData: &params.ExportData,
})
}
Expand Down Expand Up @@ -259,7 +254,7 @@ func (v *vxExportVodService) setMetadataAndPublishToVOD(

if v.params.Upload {
// Copies created files and any remaining files needed.
err = workflow.ExecuteActivity(ctx, activities.RcloneCopyDir, activities.RcloneCopyDirInput{
err = wfutils.ExecuteWithQueue(ctx, activities.RcloneCopyDir, activities.RcloneCopyDirInput{
Source: outputDir.Rclone(),
Destination: fmt.Sprintf("s3prod:vod-asset-ingest-prod/" + v.ingestFolder),
}).Get(ctx, nil)
Expand Down
23 changes: 4 additions & 19 deletions workflows/ffmpeg.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package workflows

import (
"time"

"github.com/bcc-code/bcc-media-flows/activities"
"github.com/bcc-code/bcc-media-flows/environment"
"go.temporal.io/sdk/temporal"
wfutils "github.com/bcc-code/bcc-media-flows/utils/workflows"
"go.temporal.io/sdk/workflow"
)

Expand All @@ -18,23 +15,11 @@ func ExecuteFFmpeg(
params ExecuteFFmpegInput,
) error {
logger := workflow.GetLogger(ctx)
options := workflow.ActivityOptions{
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Minute * 1,
MaximumAttempts: 10,
MaximumInterval: time.Hour * 1,
},
StartToCloseTimeout: time.Hour * 4,
ScheduleToCloseTimeout: time.Hour * 48,
HeartbeatTimeout: time.Minute * 1,
TaskQueue: environment.GetTranscodeQueue(),
}

ctx = workflow.WithActivityOptions(ctx, options)

logger.Info("Starting ExecuteFFmpeg")

err := workflow.ExecuteActivity(ctx, activities.ExecuteFFmpeg, activities.ExecuteFFmpegInput{
ctx = workflow.WithActivityOptions(ctx, wfutils.GetDefaultActivityOptions())

err := wfutils.ExecuteWithQueue(ctx, activities.ExecuteFFmpeg, activities.ExecuteFFmpegInput{
Arguments: params.Arguments,
}).Get(ctx, nil)

Expand Down
Loading

0 comments on commit dd61c14

Please sign in to comment.