From 2f74b6ecca6458519240bf74dd7d8b1e0c56089b Mon Sep 17 00:00:00 2001 From: Fredrik Vedvik Date: Tue, 26 Sep 2023 12:29:06 +0200 Subject: [PATCH] make sure every activity is queued in the same order --- workflows/asset_export-vx.go | 112 +++++++++++++++++++++++++---------- workflows/common.go | 10 ++++ 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/workflows/asset_export-vx.go b/workflows/asset_export-vx.go index 3e335a03..a27fc63f 100644 --- a/workflows/asset_export-vx.go +++ b/workflows/asset_export-vx.go @@ -323,15 +323,29 @@ func MergeExportData(ctx workflow.Context, params MergeExportDataParams) (*Merge videoTask := workflow.ExecuteActivity(ctx, activities.TranscodeMergeVideo, mergeInput) var audioTasks = map[string]workflow.Future{} - for lang, mi := range audioMergeInputs { - audioTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeMergeAudio, *mi) + { + keys, err := getMapKeysSafely(ctx, audioMergeInputs) + if err != nil { + return nil, err + } + for _, lang := range keys { + mi := audioMergeInputs[lang] + audioTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeMergeAudio, *mi) + } } var subtitleTasks = map[string]workflow.Future{} - for lang, mi := range subtitleMergeInputs { - subtitleTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeMergeSubtitles, *mi) - } + { + keys, err := getMapKeysSafely(ctx, subtitleMergeInputs) + if err != nil { + return nil, err + } + for _, lang := range keys { + mi := subtitleMergeInputs[lang] + subtitleTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeMergeSubtitles, *mi) + } + } var videoFile string { var result common.MergeResult @@ -343,23 +357,37 @@ func MergeExportData(ctx workflow.Context, params MergeExportDataParams) (*Merge } var audioFiles = map[string]string{} - for lang, task := range audioTasks { - var result common.MergeResult - err := task.Get(ctx, &result) + { + keys, err := getMapKeysSafely(ctx, audioTasks) if err != nil { return nil, err } - audioFiles[lang] = result.Path + for _, lang := range keys { + task := audioTasks[lang] + var result common.MergeResult + err = task.Get(ctx, &result) + if err != nil { + return nil, err + } + audioFiles[lang] = result.Path + } } var subtitleFiles = map[string]string{} - for lang, task := range subtitleTasks { - var result common.MergeResult - err := task.Get(ctx, &result) + { + keys, err := getMapKeysSafely(ctx, subtitleTasks) if err != nil { return nil, err } - subtitleFiles[lang] = result.Path + for _, lang := range keys { + task := subtitleTasks[lang] + var result common.MergeResult + err = task.Get(ctx, &result) + if err != nil { + return nil, err + } + subtitleFiles[lang] = result.Path + } } return &MergeExportDataResult{ @@ -447,42 +475,64 @@ func PrepareFiles(ctx workflow.Context, params PrepareFilesParams) (*PrepareFile }, } - for key := range qualities { + keys, err := getMapKeysSafely(ctx, qualities) + if err != nil { + return nil, err + } + for _, key := range keys { input := qualities[key] videoTasks[key] = workflow.ExecuteActivity(ctx, activities.TranscodeToVideoH264, input) } } var audioTasks = map[string]workflow.Future{} - for lang := range params.AudioFiles { - path := params.AudioFiles[lang] - audioTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeToAudioAac, common.AudioInput{ - Path: path, - Bitrate: "190k", - DestinationPath: tempFolder, - }) + { + keys, err := getMapKeysSafely(ctx, params.AudioFiles) + if err != nil { + return nil, err + } + for _, lang := range keys { + path := params.AudioFiles[lang] + audioTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeToAudioAac, common.AudioInput{ + Path: path, + Bitrate: "190k", + DestinationPath: tempFolder, + }) + } } var audioFiles = map[string]string{} - for lang := range audioTasks { - task := audioTasks[lang] - var result common.AudioResult - err := task.Get(ctx, &result) + { + keys, err := getMapKeysSafely(ctx, audioTasks) if err != nil { return nil, err } - audioFiles[lang] = result.OutputPath + for _, lang := range keys { + task := audioTasks[lang] + var result common.AudioResult + err = task.Get(ctx, &result) + if err != nil { + return nil, err + } + audioFiles[lang] = result.OutputPath + } } var videoFiles = map[string]string{} - for key := range videoTasks { - task := videoTasks[key] - var result common.VideoResult - err := task.Get(ctx, &result) + { + keys, err := getMapKeysSafely(ctx, videoTasks) if err != nil { return nil, err } - videoFiles[key] = result.OutputPath + for _, key := range keys { + task := videoTasks[key] + var result common.VideoResult + err = task.Get(ctx, &result) + if err != nil { + return nil, err + } + videoFiles[key] = result.OutputPath + } } return &PrepareFilesResult{ diff --git a/workflows/common.go b/workflows/common.go index 3996e316..5694b6a2 100644 --- a/workflows/common.go +++ b/workflows/common.go @@ -3,6 +3,7 @@ package workflows import ( "github.com/bcc-code/bccm-flows/activities" "github.com/bcc-code/bccm-flows/utils" + "github.com/samber/lo" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" "path/filepath" @@ -83,3 +84,12 @@ func getWorkflowTempFolder(ctx workflow.Context) (string, error) { path := utils.GetWorkflowTempFolder(ctx) return path, createFolder(ctx, path) } + +// getMapKeysSafely makes sure that the order of the keys returned are identical to other workflow executions. +func getMapKeysSafely[T any](ctx workflow.Context, m map[string]T) ([]string, error) { + var keys []string + err := workflow.SideEffect(ctx, func(ctx workflow.Context) any { + return lo.Keys(m) + }).Get(&keys) + return keys, err +}