Skip to content

Commit

Permalink
Add stop command (#126)
Browse files Browse the repository at this point in the history
* Refactor run retrieval to use util.GetRuns and remove deprecated GetRuns function

* Move GetRunByID to util

* Refactor sub-job labeling logic into util.LabelSubJobs

* Move workflow version and sub-job retrieval to util

* Add stop command

* Add support for stopping a child subjob

* Update child flag usage info

* Update README and stop command flag usage

* Fix command description
  • Loading branch information
mhmdiaa authored Jan 20, 2025
1 parent 1d4e9d5 commit 0b61013
Show file tree
Hide file tree
Showing 8 changed files with 477 additions and 230 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,25 @@ Example GitHub action usage
output: "report"
```

## Stop

Use the **stop** command to stop a running workflow or node

```
trickest stop --workflow <workflow_name> --space <space_name> [--run <run_id>] [--all] [--nodes <node_name_or_id>] [--child <child_task_index>]
```

| Flag | Type | Default | Description |
| ---------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------- |
| --url | string | / | URL copied from the Trickest platform, referencing a workflow and, optionally, a run/node |
| --workflow | string | / | The name of the workflow. |
| --project | string | / | The name of the project to which workflow belongs |
| --space | string | / | The name of the space to which workflow belongs |
| --run | string | / | Stop a specific run |
| --all | bool | false | Stop all runs |
| --nodes | string | / | A comma-separated list of nodes to stop. If none specified, the entire run will be stopped. If a node is a task group, the `--child` flag must be used |
| --child | string | / | A comma-separated list or range of child tasks to stop. Example: `--child 1,2,3` or `--child 1-3` will stop the first three tasks in the specified node's taskgroup |

## Output
Use the **output** command to download the outputs of your particular workflow execution(s) to your local environment.

Expand Down
5 changes: 2 additions & 3 deletions cmd/execute/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/trickest/trickest-cli/client/request"
"github.com/trickest/trickest-cli/cmd/delete"
"github.com/trickest/trickest-cli/cmd/list"
"github.com/trickest/trickest-cli/cmd/output"
"github.com/trickest/trickest-cli/types"
"github.com/trickest/trickest-cli/util"

Expand Down Expand Up @@ -183,7 +182,7 @@ func createNewVersion(version *types.WorkflowVersionDetailed) *types.WorkflowVer
}

fleet := util.GetFleetInfo(fleetName)
newVersion := output.GetWorkflowVersionByID(newVersionInfo.ID, fleet.ID)
newVersion := util.GetWorkflowVersionByID(newVersionInfo.ID, fleet.ID)
return newVersion
}

Expand Down Expand Up @@ -267,7 +266,7 @@ func GetLatestWorkflowVersion(workflowID uuid.UUID, fleetID uuid.UUID) *types.Wo
}

if fleetID != uuid.Nil {
maxMachines, err := output.GetWorkflowVersionMaxMachines(latestVersion.ID.String(), fleetID)
maxMachines, err := util.GetWorkflowVersionMaxMachines(latestVersion.ID.String(), fleetID)
if err != nil {
fmt.Printf("Error getting maximum machines: %v", err)
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/execute/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func WatchRun(runID uuid.UUID, downloadPath string, nodesToDownload []string, fi
mutex.Unlock()
break
}
version := output.GetWorkflowVersionByID(*run.WorkflowVersionInfo, uuid.Nil)
version := util.GetWorkflowVersionByID(*run.WorkflowVersionInfo, uuid.Nil)
allNodes, roots := CreateTrees(version, false)

out := ""
Expand Down
3 changes: 1 addition & 2 deletions cmd/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/trickest/trickest-cli/client/request"
"github.com/trickest/trickest-cli/cmd/execute"
"github.com/trickest/trickest-cli/cmd/output"
"github.com/trickest/trickest-cli/types"
"github.com/trickest/trickest-cli/util"

Expand Down Expand Up @@ -48,7 +47,7 @@ var GetCmd = &cobra.Command{
}
}
if runID == "" {
runs = output.GetRuns(version.WorkflowInfo, 1)
runs = util.GetRuns(version.WorkflowInfo, 1, "")
} else {
runUUID, err := uuid.Parse(runID)
if err != nil {
Expand Down
235 changes: 11 additions & 224 deletions cmd/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,45 +140,10 @@ func DownloadRunOutput(run *types.Run, nodes []string, files []string, destinati
return
}

version := GetWorkflowVersionByID(*run.WorkflowVersionInfo, uuid.Nil)

subJobs := getSubJobs(*run.ID)
labels := make(map[string]bool)

for i := range subJobs {
subJobs[i].Label = version.Data.Nodes[subJobs[i].Name].Meta.Label
subJobs[i].Label = strings.ReplaceAll(subJobs[i].Label, "/", "-")
if labels[subJobs[i].Label] {
existingLabel := subJobs[i].Label
subJobs[i].Label = subJobs[i].Name
if labels[subJobs[i].Label] {
subJobs[i].Label += "-1"
for c := 1; c >= 1; c++ {
if labels[subJobs[i].Label] {
subJobs[i].Label = strings.TrimSuffix(subJobs[i].Label, "-"+strconv.Itoa(c))
subJobs[i].Label += "-" + strconv.Itoa(c+1)
} else {
labels[subJobs[i].Label] = true
break
}
}
} else {
for s := 0; s < i; s++ {
if subJobs[s].Label == existingLabel {
subJobs[s].Label = subJobs[s].Name
if subJobs[s].Children != nil {
for j := range subJobs[s].Children {
subJobs[s].Children[j].Label = strconv.Itoa(subJobs[s].Children[j].TaskIndex) + "-" + subJobs[s].Name
}
}
}
}
labels[subJobs[i].Label] = true
}
} else {
labels[subJobs[i].Label] = true
}
}
version := util.GetWorkflowVersionByID(*run.WorkflowVersionInfo, uuid.Nil)

subJobs := util.GetSubJobs(*run.ID)
subJobs = util.LabelSubJobs(subJobs, *version)

const layout = "2006-01-02T150405Z"
runDir := "run-" + run.StartedDate.Format(layout)
Expand Down Expand Up @@ -244,26 +209,26 @@ func DownloadRunOutput(run *types.Run, nodes []string, files []string, destinati
func getRelevantRuns(workflow types.Workflow, allRuns bool, runID string, numberOfRuns int, workflowURL string) ([]types.Run, error) {
switch {
case allRuns:
return GetRuns(workflow.ID, math.MaxInt), nil
return util.GetRuns(workflow.ID, math.MaxInt, ""), nil

case runID != "":
runUUID, err := uuid.Parse(runID)
if err != nil {
return nil, fmt.Errorf("invalid run ID: %s", runID)
}
run := GetRunByID(runUUID)
run := util.GetRunByID(runUUID)
return []types.Run{*run}, nil

case numberOfRuns > 1:
return GetRuns(workflow.ID, numberOfRuns), nil
return util.GetRuns(workflow.ID, numberOfRuns, ""), nil

default:
workflowURLRunID, _ := util.GetRunIDFromWorkflowURL(workflowURL)
if runUUID, err := uuid.Parse(workflowURLRunID); err == nil {
run := GetRunByID(runUUID)
run := util.GetRunByID(runUUID)
return []types.Run{*run}, nil
}
return GetRuns(workflow.ID, 1), nil
return util.GetRuns(workflow.ID, 1, ""), nil
}
}

Expand Down Expand Up @@ -345,7 +310,7 @@ func downloadSubJobOutput(savePath string, subJob *types.SubJob, files []string,

savePath = path.Join(savePath, subJob.Label)
if subJob.TaskGroup {
subJobCount, err := getChildrenSubJobsCount(*subJob)
subJobCount, err := util.GetChildrenSubJobsCount(*subJob)
if err != nil {
return fmt.Errorf("couldn't get children sub-jobs count for sub-job %s", subJob.Label)
}
Expand All @@ -364,7 +329,7 @@ func downloadSubJobOutput(savePath string, subJob *types.SubJob, files []string,
sem <- struct{}{}
defer func() { <-sem }()

child, err := getChildSubJob(subJob.ID, i)
child, err := util.GetChildSubJob(subJob.ID, i)
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("couldn't get child %d sub-jobs for sub-job %s: %v", i, subJob.Label, err))
Expand Down Expand Up @@ -471,181 +436,3 @@ func filterSubJobOutputsByFileNames(outputs []types.SubJobOutput, fileNames []st

return matchingOutputs
}

func GetRunByID(id uuid.UUID) *types.Run {
resp := request.Trickest.Get().DoF("execution/%s/", id)
if resp == nil {
fmt.Println("Error: Couldn't get run!")
os.Exit(0)
}

if resp.Status() != http.StatusOK {
request.ProcessUnexpectedResponse(resp)
}

var run types.Run
err := json.Unmarshal(resp.Body(), &run)
if err != nil {
fmt.Println("Error unmarshalling run response!")
return nil
}

return &run
}

func getSubJobs(runID uuid.UUID) []types.SubJob {
if runID == uuid.Nil {
fmt.Println("Couldn't list sub-jobs, no run ID parameter specified!")
return nil
}
urlReq := "subjob/?execution=" + runID.String()
urlReq += "&page_size=" + strconv.Itoa(math.MaxInt)

resp := request.Trickest.Get().DoF(urlReq)
if resp == nil {
fmt.Println("Error: Couldn't get sub-jobs!")
return nil
}

if resp.Status() != http.StatusOK {
request.ProcessUnexpectedResponse(resp)
}

var subJobs types.SubJobs
err := json.Unmarshal(resp.Body(), &subJobs)
if err != nil {
fmt.Println("Error unmarshalling sub-jobs response!")
return nil
}

return subJobs.Results
}

func GetRuns(workflowID uuid.UUID, pageSize int) []types.Run {
urlReq := "execution/?type=Editor&vault=" + util.GetVault().String()

if workflowID != uuid.Nil {
urlReq += "&workflow=" + workflowID.String()
}

if pageSize != 0 {
urlReq += "&page_size=" + strconv.Itoa(pageSize)
} else {
urlReq += "&page_size=" + strconv.Itoa(math.MaxInt)
}

resp := request.Trickest.Get().DoF(urlReq)
if resp == nil {
fmt.Println("Error: Couldn't get runs!")
return nil
}

if resp.Status() != http.StatusOK {
request.ProcessUnexpectedResponse(resp)
}

var runs types.Runs
err := json.Unmarshal(resp.Body(), &runs)
if err != nil {
fmt.Println("Error unmarshalling runs response!")
return nil
}

return runs.Results
}

func GetWorkflowVersionByID(versionID, fleetID uuid.UUID) *types.WorkflowVersionDetailed {
resp := request.Trickest.Get().DoF("workflow-version/%s/", versionID)
if resp == nil {
fmt.Println("Error: Couldn't get workflow version!")
return nil
}

if resp.Status() != http.StatusOK {
request.ProcessUnexpectedResponse(resp)
}

var workflowVersion types.WorkflowVersionDetailed
err := json.Unmarshal(resp.Body(), &workflowVersion)
if err != nil {
fmt.Println("Error unmarshalling workflow version response!")
return nil
}

if fleetID != uuid.Nil {
maxMachines, err := GetWorkflowVersionMaxMachines(versionID.String(), fleetID)
if err != nil {
fmt.Printf("Error getting maximum machines: %v", err)
return nil
}
workflowVersion.MaxMachines = maxMachines

}
return &workflowVersion
}

func GetWorkflowVersionMaxMachines(version string, fleet uuid.UUID) (types.Machines, error) {
resp := request.Trickest.Get().DoF("workflow-version/%s/max-machines/?fleet=%s", version, fleet)
if resp == nil {
return types.Machines{}, fmt.Errorf("couldn't get workflow version's maximum machines")
}

if resp.Status() != http.StatusOK {
return types.Machines{}, fmt.Errorf("unexpected response status code for workflow version's maximum machines: %d", resp.Status())
}

var machines types.Machines
err := json.Unmarshal(resp.Body(), &machines)
if err != nil {
return types.Machines{}, fmt.Errorf("couldn't unmarshal workflow versions's maximum machines: %v", err)
}

return machines, nil
}

func getChildrenSubJobsCount(subJob types.SubJob) (int, error) {
urlReq := "subjob/children/?parent=" + subJob.ID.String()
urlReq += "&page_size=" + strconv.Itoa(math.MaxInt)

resp := request.Trickest.Get().DoF(urlReq)
if resp == nil {
return -1, fmt.Errorf("couldn't get children sub-jobs count for sub-job %s", subJob.Label)
}

if resp.Status() != http.StatusOK {
request.ProcessUnexpectedResponse(resp)
}

var subJobs types.SubJobs
err := json.Unmarshal(resp.Body(), &subJobs)
if err != nil {
return -1, fmt.Errorf("couldn't unmarshal sub-job children response for sub-job %s: %v", subJob.Label, err)
}

return subJobs.Count, nil
}

func getChildSubJob(subJobID uuid.UUID, taskIndex int) (types.SubJob, error) {
urlReq := "subjob/children/?parent=" + subJobID.String()
urlReq += "&task_index=" + strconv.Itoa(taskIndex)

resp := request.Trickest.Get().DoF(urlReq)
if resp == nil {
return types.SubJob{}, fmt.Errorf("couldn't get child sub-job: %d", taskIndex)
}
if resp.Status() != http.StatusOK {
request.ProcessUnexpectedResponse(resp)
}

var child types.SubJobs

err := json.Unmarshal(resp.Body(), &child)
if err != nil {
return types.SubJob{}, fmt.Errorf("couldn't unmarshal child sub-job response: %v", err)
}

if len(child.Results) != 1 {
return types.SubJob{}, fmt.Errorf("unexpected number of child sub-jobs: %d", len(child.Results))
}
return child.Results[0], nil
}
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/trickest/trickest-cli/cmd/list"
"github.com/trickest/trickest-cli/cmd/output"
"github.com/trickest/trickest-cli/cmd/scripts"
"github.com/trickest/trickest-cli/cmd/stop"
"github.com/trickest/trickest-cli/cmd/tools"
"github.com/trickest/trickest-cli/util"

Expand Down Expand Up @@ -57,6 +58,7 @@ func init() {
RootCmd.AddCommand(files.FilesCmd)
RootCmd.AddCommand(tools.ToolsCmd)
RootCmd.AddCommand(scripts.ScriptsCmd)
RootCmd.AddCommand(stop.StopCmd)
// RootCmd.AddCommand(export.ExportCmd)
}

Expand Down
Loading

0 comments on commit 0b61013

Please sign in to comment.