Skip to content

Commit

Permalink
make sure every activity is queued in the same order
Browse files Browse the repository at this point in the history
  • Loading branch information
fredrikvedvik committed Sep 26, 2023
1 parent 4c55d82 commit 2f74b6e
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 31 deletions.
112 changes: 81 additions & 31 deletions workflows/asset_export-vx.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,29 @@ func MergeExportData(ctx workflow.Context, params MergeExportDataParams) (*Merge
videoTask := workflow.ExecuteActivity(ctx, activities.TranscodeMergeVideo, mergeInput)

var audioTasks = map[string]workflow.Future{}
for lang, mi := range audioMergeInputs {
audioTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeMergeAudio, *mi)
{
keys, err := getMapKeysSafely(ctx, audioMergeInputs)
if err != nil {
return nil, err
}
for _, lang := range keys {
mi := audioMergeInputs[lang]
audioTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeMergeAudio, *mi)
}
}

var subtitleTasks = map[string]workflow.Future{}
for lang, mi := range subtitleMergeInputs {
subtitleTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeMergeSubtitles, *mi)
}
{
keys, err := getMapKeysSafely(ctx, subtitleMergeInputs)
if err != nil {
return nil, err
}
for _, lang := range keys {
mi := subtitleMergeInputs[lang]
subtitleTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeMergeSubtitles, *mi)
}

}
var videoFile string
{
var result common.MergeResult
Expand All @@ -343,23 +357,37 @@ func MergeExportData(ctx workflow.Context, params MergeExportDataParams) (*Merge
}

var audioFiles = map[string]string{}
for lang, task := range audioTasks {
var result common.MergeResult
err := task.Get(ctx, &result)
{
keys, err := getMapKeysSafely(ctx, audioTasks)
if err != nil {
return nil, err
}
audioFiles[lang] = result.Path
for _, lang := range keys {
task := audioTasks[lang]
var result common.MergeResult
err = task.Get(ctx, &result)
if err != nil {
return nil, err
}
audioFiles[lang] = result.Path
}
}

var subtitleFiles = map[string]string{}
for lang, task := range subtitleTasks {
var result common.MergeResult
err := task.Get(ctx, &result)
{
keys, err := getMapKeysSafely(ctx, subtitleTasks)
if err != nil {
return nil, err
}
subtitleFiles[lang] = result.Path
for _, lang := range keys {
task := subtitleTasks[lang]
var result common.MergeResult
err = task.Get(ctx, &result)
if err != nil {
return nil, err
}
subtitleFiles[lang] = result.Path
}
}

return &MergeExportDataResult{
Expand Down Expand Up @@ -447,42 +475,64 @@ func PrepareFiles(ctx workflow.Context, params PrepareFilesParams) (*PrepareFile
},
}

for key := range qualities {
keys, err := getMapKeysSafely(ctx, qualities)
if err != nil {
return nil, err
}
for _, key := range keys {
input := qualities[key]
videoTasks[key] = workflow.ExecuteActivity(ctx, activities.TranscodeToVideoH264, input)
}
}

var audioTasks = map[string]workflow.Future{}
for lang := range params.AudioFiles {
path := params.AudioFiles[lang]
audioTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeToAudioAac, common.AudioInput{
Path: path,
Bitrate: "190k",
DestinationPath: tempFolder,
})
{
keys, err := getMapKeysSafely(ctx, params.AudioFiles)
if err != nil {
return nil, err
}
for _, lang := range keys {
path := params.AudioFiles[lang]
audioTasks[lang] = workflow.ExecuteActivity(ctx, activities.TranscodeToAudioAac, common.AudioInput{
Path: path,
Bitrate: "190k",
DestinationPath: tempFolder,
})
}
}

var audioFiles = map[string]string{}
for lang := range audioTasks {
task := audioTasks[lang]
var result common.AudioResult
err := task.Get(ctx, &result)
{
keys, err := getMapKeysSafely(ctx, audioTasks)
if err != nil {
return nil, err
}
audioFiles[lang] = result.OutputPath
for _, lang := range keys {
task := audioTasks[lang]
var result common.AudioResult
err = task.Get(ctx, &result)
if err != nil {
return nil, err
}
audioFiles[lang] = result.OutputPath
}
}

var videoFiles = map[string]string{}
for key := range videoTasks {
task := videoTasks[key]
var result common.VideoResult
err := task.Get(ctx, &result)
{
keys, err := getMapKeysSafely(ctx, videoTasks)
if err != nil {
return nil, err
}
videoFiles[key] = result.OutputPath
for _, key := range keys {
task := videoTasks[key]
var result common.VideoResult
err = task.Get(ctx, &result)
if err != nil {
return nil, err
}
videoFiles[key] = result.OutputPath
}
}

return &PrepareFilesResult{
Expand Down
10 changes: 10 additions & 0 deletions workflows/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package workflows
import (
"github.com/bcc-code/bccm-flows/activities"
"github.com/bcc-code/bccm-flows/utils"
"github.com/samber/lo"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
"path/filepath"
Expand Down Expand Up @@ -83,3 +84,12 @@ func getWorkflowTempFolder(ctx workflow.Context) (string, error) {
path := utils.GetWorkflowTempFolder(ctx)
return path, createFolder(ctx, path)
}

// getMapKeysSafely makes sure that the order of the keys returned are identical to other workflow executions.
func getMapKeysSafely[T any](ctx workflow.Context, m map[string]T) ([]string, error) {
var keys []string
err := workflow.SideEffect(ctx, func(ctx workflow.Context) any {
return lo.Keys(m)
}).Get(&keys)
return keys, err
}

0 comments on commit 2f74b6e

Please sign in to comment.