diff --git a/Makefile b/Makefile index 5767e60..a9507c7 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ dependency: ## Get dependencies lint: dependency ## Lint the files @echo "Running linter..." @if [ ! -f "${GOPATH}/bin/golangci-lint" ] && [ ! -f "$(GOROOT)/bin/golangci-lint" ]; then \ - ${GO_COMMAND} install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.60.2; \ + ${GO_COMMAND} install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.60.3; \ fi @golangci-lint run --timeout 30m -E contextcheck -D unused diff --git a/lib/airflow/workflow.go b/lib/airflow/workflow.go index a558a7b..b759aa1 100644 --- a/lib/airflow/workflow.go +++ b/lib/airflow/workflow.go @@ -1,13 +1,20 @@ package airflow import ( + "encoding/base64" + "encoding/json" "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "sync" + "github.com/apache/airflow-client-go/airflow" "github.com/cloud-barista/cm-cicada/lib/config" "github.com/cloud-barista/cm-cicada/pkg/api/rest/model" "github.com/jollaman999/utils/fileutil" "github.com/jollaman999/utils/logger" - "sync" ) var dagRequests = make(map[string]*sync.Mutex) @@ -71,7 +78,6 @@ func (client *client) GetDAGs() (airflow.DAGCollection, error) { logger.Println(logger.ERROR, false, "AIRFLOW: Error occurred while getting DAGs. (Error: "+err.Error()+").") } - return resp, err } @@ -141,3 +147,196 @@ func (client *client) DeleteDAG(dagID string, deleteFolderOnly bool) error { return err } +func (client *client) GetDAGRuns(dagID string) (airflow.DAGRunCollection, error) { + deferFunc := callDagRequestLock(dagID) + defer func() { + deferFunc() + }() + ctx, cancel := Context() + defer cancel() + resp, _, err := client.api.DAGRunApi.GetDagRuns(ctx, dagID).Execute() + if err != nil { + logger.Println(logger.ERROR, false, + "AIRFLOW: Error occurred while getting DAGRuns. (Error: "+err.Error()+").") + } + return resp, err +} + +func (client *client) GetTaskInstances(dagID string, dagRunId string) (airflow.TaskInstanceCollection, error) { + deferFunc := callDagRequestLock(dagID) + defer func() { + deferFunc() + }() + ctx, cancel := Context() + defer cancel() + resp, http, err := client.api.TaskInstanceApi.GetTaskInstances(ctx, dagID, dagRunId).Execute() + fmt.Println("test : ", http) + if err != nil { + logger.Println(logger.ERROR, false, + "AIRFLOW: Error occurred while getting TaskInstances. (Error: "+err.Error()+").") + } + return resp, err +} + +func (client *client) GetTaskLogs(dagID, dagRunID, taskID string, taskTryNumber int) (airflow.InlineResponse200, error) { + deferFunc := callDagRequestLock(dagID) + defer func() { + deferFunc() + }() + ctx, cancel := Context() + defer cancel() + + // TaskInstanceApi 인스턴스를 사용하여 로그 요청 + logs, _, err := client.api.TaskInstanceApi.GetLog(ctx, dagID, dagRunID, taskID, int32(taskTryNumber)).FullContent(true).Execute() + logger.Println(logger.INFO, false,logs) + if err != nil { + logger.Println(logger.ERROR, false, + "AIRFLOW: Error occurred while getting TaskInstance logs. (Error: "+err.Error()+").") + } + + return logs, nil +} + +func (client *client) ClearTaskInstance(dagID string, dagRunID string, taskID string) (airflow.TaskInstanceReferenceCollection, error) { + deferFunc := callDagRequestLock(dagID) + defer func() { + deferFunc() + }() + ctx, cancel := Context() + defer cancel() + + dryRun := false + taskIds := []string{taskID} + includeDownstream := true + includeFuture := false + includeParentdag := false + includePast := false + includeSubdags := true + includeUpstream := false + onlyFailed := false + onlyRunning := false + resetDagRuns := true + + clearTask := airflow.ClearTaskInstances{ + DryRun: &dryRun, + TaskIds: &taskIds, + IncludeSubdags: &includeSubdags, + IncludeParentdag: &includeParentdag, + IncludeUpstream: &includeUpstream, + IncludeDownstream: &includeDownstream, + IncludeFuture: &includeFuture, + IncludePast: &includePast, + OnlyFailed: &onlyFailed, + OnlyRunning: &onlyRunning, + ResetDagRuns: &resetDagRuns, + DagRunId: *airflow.NewNullableString(&dagRunID), + } + + // 요청 생성 + request := client.api.DAGApi.PostClearTaskInstances(ctx, dagID) + + // ClearTaskInstances 데이터 설정 + request = request.ClearTaskInstances(clearTask) + + // 요청 실행 + logs, _, err := client.api.DAGApi.PostClearTaskInstancesExecute(request) + if err != nil { + logger.Println(logger.ERROR, false, + "AIRFLOW: Error occurred while clearing TaskInstance. (Error: " + err.Error() + ").") + return airflow.TaskInstanceReferenceCollection{}, err + } + + return logs, nil +} + +func (client *client) GetEventLogs(dagID string, dagRunId string, taskId string) (airflow.EventLogCollection, error) { + deferFunc := callDagRequestLock(dagID) + defer func() { + deferFunc() + }() + ctx, cancel := Context() + defer cancel() + + localBasePath, err:=client.api.GetConfig().ServerURLWithContext(ctx, "EventLogApiService.GetEventLog") + baseURL := "http://"+client.api.GetConfig().Host+localBasePath + "/eventLogs" + queryParams := map[string]string{ + "offset": "0", + "limit": "100", + "dag_id": dagID, + "run_id": dagRunId, + "task_id": taskId, + "order_by": "-when", + "excluded_events": "gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data", + } + query := url.Values{} + for key, value := range queryParams { + query.Add(key, value) + } + queryString := query.Encode() + fullURL := fmt.Sprintf("%s?%s", baseURL, queryString) + httpclient := client.api.GetConfig().HTTPClient + // fmt.Println(&httpclient.) + + // 요청 생성 + req, err := http.NewRequest("GET", fullURL, nil) + if err != nil { + fmt.Println("Error creating request:", err) + } + cred := ctx.Value(airflow.ContextBasicAuth).(airflow.BasicAuth) + addBasicAuth(req, cred.UserName, cred.Password) + res, err := httpclient.Do(req) + if err != nil { + fmt.Println("Error sending request:", err) + } + defer res.Body.Close() + body, err := ioutil.ReadAll(res.Body) + if err != nil { + fmt.Println("Error reading response body:", err) + } + + var eventlogs airflow.EventLogCollection + err = json.Unmarshal(body, &eventlogs) + if err != nil { + fmt.Println("Error unmarshal response body:", err) + } + + + return eventlogs, err +} + +func (client *client) GetImportErrors() (airflow.ImportErrorCollection, error) { + ctx, cancel := Context() + defer cancel() + + // TaskInstanceApi 인스턴스를 사용하여 로그 요청 + logs,_,err := client.api.ImportErrorApi.GetImportErrors(ctx).Execute() + logger.Println(logger.INFO, false,logs) + if err != nil { + logger.Println(logger.ERROR, false, + "AIRFLOW: Error occurred while getting import dag errors. (Error: "+err.Error()+").") + } + + return logs, nil +} + + +func (client *client) PatchDag(dagID string, dagBody airflow.DAG) (airflow.DAG, error){ + ctx, cancel := Context() + defer cancel() + + // TaskInstanceApi 인스턴스를 사용하여 로그 요청 + logs,_,err := client.api.DAGApi.PatchDag(ctx, dagID).DAG(dagBody).Execute() + logger.Println(logger.INFO, false,logs) + if err != nil { + logger.Println(logger.ERROR, false, + "AIRFLOW: Error occurred while getting import dag errors. (Error: "+err.Error()+").") + } + + return logs, nil +} + +func addBasicAuth(req *http.Request, username, password string) { + auth := username + ":" + password + encodedAuth := base64.StdEncoding.EncodeToString([]byte(auth)) + req.Header.Add("Authorization", "Basic "+encodedAuth) +} \ No newline at end of file diff --git a/pkg/api/rest/common/url.go b/pkg/api/rest/common/url.go new file mode 100644 index 0000000..65f730b --- /dev/null +++ b/pkg/api/rest/common/url.go @@ -0,0 +1,22 @@ +package common + +import ( + "fmt" + "net/url" +) + +func UrlDecode(text string) (string) { + decodedStr, err := url.QueryUnescape(text) + if err != nil { + fmt.Println("Error decoding URL:", err) + return err.Error() + } else { + return decodedStr + } +} + +func UrlEncode(text string) (string) { + encodedStr := url.QueryEscape(text) + + return encodedStr +} \ No newline at end of file diff --git a/pkg/api/rest/controller/workflow.go b/pkg/api/rest/controller/workflow.go index b91bb6d..aeadc60 100644 --- a/pkg/api/rest/controller/workflow.go +++ b/pkg/api/rest/controller/workflow.go @@ -2,6 +2,12 @@ package controller import ( "errors" + "fmt" + "net/http" + "reflect" + "strconv" + "time" + "github.com/cloud-barista/cm-cicada/dao" "github.com/cloud-barista/cm-cicada/lib/airflow" "github.com/cloud-barista/cm-cicada/pkg/api/rest/common" @@ -10,9 +16,6 @@ import ( "github.com/jollaman999/utils/logger" "github.com/labstack/echo/v4" "github.com/mitchellh/mapstructure" - "net/http" - "reflect" - "time" ) func toTimeHookFunc() mapstructure.DecodeHookFunc { @@ -877,3 +880,312 @@ func GetTaskDirectly(c echo.Context) error { return common.ReturnErrorMsg(c, "task not found.") } + +// GetTaskLogs godoc +// +// @Summary Get Task Logs +// @Description Get the task Logs. +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Param wfId path string true "ID of the workflow." +// @Param wfRunId path string true "ID of the workflowRunId." +// @Param taskId path string true "ID of the task." +// @Param taskTyNum path string true "ID of the taskTryNum." +// @Success 200 {object} airflow.InlineResponse200 "Successfully get the task Logs." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to get the task Logs." +// @Router /workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/taskTryNum/{taskTyNum}/logs [get] +func GetTaskLogs(c echo.Context) error { + wfId := c.Param("wfId") + if wfId == "" { + return common.ReturnErrorMsg(c, "Please provide the wfId.") + } + wfRunId := c.Param("wfRunId") + if wfRunId == "" { + return common.ReturnErrorMsg(c, "Please provide the wfRunId.") + } + + taskId := c.Param("taskId") + if taskId == "" { + return common.ReturnErrorMsg(c, "Please provide the taskId.") + } + taskInfo, err := dao.TaskGet(taskId) + if err != nil { + return common.ReturnErrorMsg(c, "Invalid get tasK from taskId.") + } + + taskTyNum := c.Param("taskTyNum") + if taskTyNum == "" { + return common.ReturnErrorMsg(c, "Please provide the taskTyNum.") + } + taskTyNumToInt, err := strconv.Atoi(taskTyNum) + if err != nil { + return common.ReturnErrorMsg(c, "Invalid taskTryNum format.") + } + logs, err := airflow.Client.GetTaskLogs(wfId, common.UrlDecode(wfRunId), taskInfo.Name, taskTyNumToInt) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the workflow logs: " + err.Error()) + } + + taskLog := model.TaskLog { + Content: *logs.Content, + } + + return c.JSONPretty(http.StatusOK, taskLog, " ") +} + +// workflowRuns godoc +// +// @Summary Get workflowRuns +// @Description Get the task Logs. +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Param wfId path string true "ID of the workflow." +// @Success 200 {object} []model.WorkflowRun "Successfully get the workflowRuns." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to get the workflowRuns." +// @Router /workflow/{wfId}/runs [get] +func GetWorkflowRuns(c echo.Context) error { + wfId := c.Param("wfId") + if wfId == "" { + return common.ReturnErrorMsg(c, "Please provide the wfId.") + } + + runList, err := airflow.Client.GetDAGRuns(wfId) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the workflow runs: " + err.Error()) + } + + var transformedRuns []model.WorkflowRun; + + for _, dagRun := range *runList.DagRuns { + transformedRun := model.WorkflowRun { + WorkflowID: dagRun.DagId, + WorkflowRunID: dagRun.GetDagRunId(), + DataIntervalStart: dagRun.GetDataIntervalStart(), + DataIntervalEnd: dagRun.GetDataIntervalEnd(), + State: string(dagRun.GetState()), + ExecutionDate: dagRun.GetExecutionDate(), + StartDate: dagRun.GetStartDate(), + EndDate: dagRun.GetEndDate(), + RunType: dagRun.GetRunType(), + LastSchedulingDecision: dagRun.GetLastSchedulingDecision(), + DurationDate: (dagRun.GetEndDate().Sub(dagRun.GetStartDate()).Seconds()), + } + transformedRuns = append(transformedRuns,transformedRun ) +} + + return c.JSONPretty(http.StatusOK, transformedRuns, " ") +} +// taskInstances godoc +// +// @Summary Get taskInstances +// @Description Get the task Logs. +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Param wfId path string true "ID of the workflow." +// @Param wfRunId path string true "ID of the workflow." +// @Success 200 {object} model.TaskInstance "Successfully get the taskInstances." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to get the taskInstances." +// @Router /workflow/{wfId}/workflowRun/{wfRunId}/taskInstances [get] +func GetTaskInstances(c echo.Context) error { + wfId := c.Param("wfId") + if wfId == "" { + return common.ReturnErrorMsg(c, "Please provide the wfId.") + } + wfRunId := c.Param("wfRunId") + if wfRunId == "" { + return common.ReturnErrorMsg(c, "Please provide the wfRunId.") + } + runList, err := airflow.Client.GetTaskInstances(common.UrlDecode(wfId),common.UrlDecode(wfRunId) ) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + var taskInstances []model.TaskInstance; + layout := time.RFC3339Nano + + for _, taskInstance := range *runList.TaskInstances { + taskDBInfo, err := dao.TaskGetByWorkflowIDAndName(taskInstance.GetDagId(),taskInstance.GetTaskId()) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + taskId := &taskDBInfo.ID + executionDate, err:= time.Parse(layout, taskInstance.GetExecutionDate()) + if err != nil { + fmt.Println("Error parsing execution date:", err) + continue + } + startDate, err:= time.Parse(layout, taskInstance.GetExecutionDate()) + if err != nil { + fmt.Println("Error parsing execution date:", err) + continue + } + endDate, err:= time.Parse(layout, taskInstance.GetExecutionDate()) + if err != nil { + fmt.Println("Error parsing execution date:", err) + continue + } + taskInfo := model.TaskInstance { + WorkflowID: taskInstance.DagId, + WorkflowRunID: taskInstance.GetDagRunId(), + TaskID: *taskId, + TaskName: taskInstance.GetTaskId(), + State: string(taskInstance.GetState()), + ExecutionDate: executionDate, + StartDate: startDate, + EndDate: endDate, + DurationDate: float64(taskInstance.GetDuration()), + TryNumber: int(taskInstance.GetTryNumber()), + } + taskInstances = append(taskInstances,taskInfo ) +} + return c.JSONPretty(http.StatusOK, taskInstances, " ") +} + +// taskInstances godoc +// +// @Summary Clear taskInstances +// @Description Clear the task Instance. +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Param wfId path string true "ID of the workflow." +// @Param wfRunId path string true "ID of the wfRunId." +// @Param taskId path string true "ID of the taskId." +// @Success 200 {object} model.TaskInstanceReference "Successfully clear the taskInstances." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to clear the taskInstances." +// @Router /workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/clear [post] +func ClearTaskInstances(c echo.Context) error { + wfId := c.Param("wfId") + if wfId == "" { + return common.ReturnErrorMsg(c, "Please provide the wfId.") + } + wfRunId := c.Param("wfRunId") + if wfRunId == "" { + return common.ReturnErrorMsg(c, "Please provide the wfRunId.") + } + taskId := c.Param("taskId") + if taskId == "" { + return common.ReturnErrorMsg(c, "Please provide the taskId.") + } else { + taskDBInfo, err := dao.TaskGet(taskId) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + taskId = taskDBInfo.Name + } + var TaskInstanceReferences []model.TaskInstanceReference; + clearList, err := airflow.Client.ClearTaskInstance(wfId, common.UrlDecode(wfRunId),taskId ) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + + for _, taskInstance := range *clearList.TaskInstances { + taskDBInfo, err := dao.TaskGetByWorkflowIDAndName(taskInstance.GetDagId(),taskInstance.GetTaskId()) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + taskId := &taskDBInfo.ID + taskInfo := model.TaskInstanceReference { + WorkflowID: taskInstance.DagId, + WorkflowRunID: taskInstance.DagRunId, + TaskId: taskId, + TaskName: taskInstance.GetTaskId(), + ExecutionDate: taskInstance.ExecutionDate, + } + TaskInstanceReferences = append(TaskInstanceReferences,taskInfo ) + } + + + return c.JSONPretty(http.StatusOK, TaskInstanceReferences, " ") +} + +// Eventlogs godoc +// +// @ID get-Eventlog +// @Summary Get Eventlog +// @Description Get Eventlog. +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Param wfId query string true "ID of the workflow." +// @Param wfRunId query string false "ID of the workflow run." +// @Param taskId query string false "ID of the task." +// @Success 200 {object} []model.EventLog "Successfully get the workflow." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to get the workflow." +// @Router /eventlogs [get] +func GetEventLogs(c echo.Context) error { + wfId := c.QueryParam("dag_id") + if wfId == "" { + return common.ReturnErrorMsg(c, "Please provide the dagId.") + } + + var wfRunId,taskId, taskName string + + if c.QueryParam("wfRunId") != "" { + wfRunId = c.QueryParam("wfRunId") + } + if c.QueryParam("taskId") != "" { + taskId = c.QueryParam("taskId") + taskDBInfo, err := dao.TaskGet(taskId) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + taskName = taskDBInfo.Name + } + var eventLogs []model.EventLog; + logs, err := airflow.Client.GetEventLogs(wfId,wfRunId,taskName) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + + for _, log := range *logs.EventLogs { + taskDBInfo, err := dao.TaskGetByWorkflowIDAndName(wfId,log.GetTaskId()) + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + taskId := &taskDBInfo.ID + eventlog := model.EventLog { + WorkflowID: log.GetDagId(), + WorkflowRunID: wfRunId, + TaskID: *taskId, + TaskName: log.GetTaskId(), + Extra: log.GetExtra(), + Event: log.GetEvent(), + When: log.GetWhen(), + } + eventLogs = append(eventLogs,eventlog) + } + // logs, err := airflow.Client.GetEventLogs(wfId) + // if err != nil { + // return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + // } + + return c.JSONPretty(http.StatusOK, eventLogs, " ") +} + +// ImportErrors godoc +// +// @Summary Get importErrors +// @Description Get the importErrors. +// @Tags [Workflow] +// @Accept json +// @Produce json +// @Success 200 {object} airflow.ImportErrorCollection "Successfully get the importErrors." +// @Failure 400 {object} common.ErrorResponse "Sent bad request." +// @Failure 500 {object} common.ErrorResponse "Failed to get the importErrors." +// @Router /importErrors [get] +func GetImportErrors(c echo.Context) error { + logs, err := airflow.Client.GetImportErrors() + if err != nil { + return common.ReturnErrorMsg(c, "Failed to get the taskInstances: " + err.Error()) + } + + return c.JSONPretty(http.StatusOK, logs, " ") +} \ No newline at end of file diff --git a/pkg/api/rest/docs/docs.go b/pkg/api/rest/docs/docs.go index 1559e10..f5ea375 100644 --- a/pkg/api/rest/docs/docs.go +++ b/pkg/api/rest/docs/docs.go @@ -19,6 +19,101 @@ const docTemplate = `{ "host": "{{.Host}}", "basePath": "{{.BasePath}}", "paths": { + "/eventlogs": { + "get": { + "description": "Get Eventlog.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get Eventlog", + "operationId": "get-Eventlog", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "query", + "required": true + }, + { + "type": "string", + "description": "ID of the workflow run.", + "name": "wfRunId", + "in": "query" + }, + { + "type": "string", + "description": "ID of the task.", + "name": "taskId", + "in": "query" + } + ], + "responses": { + "200": { + "description": "Successfully get the workflow.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog" + } + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the workflow.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, + "/importErrors": { + "get": { + "description": "Get the importErrors.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get importErrors", + "responses": { + "200": { + "description": "Successfully get the importErrors.", + "schema": { + "$ref": "#/definitions/airflow.ImportErrorCollection" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the importErrors.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/readyz": { "get": { "description": "Check Cicada is ready", @@ -756,6 +851,53 @@ const docTemplate = `{ } } }, + "/workflow/{wfId}/runs": { + "get": { + "description": "Get the task Logs.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get workflowRuns", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully get the workflowRuns.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowRun" + } + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the workflowRuns.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/workflow/{wfId}/task": { "get": { "description": "Get a task list of the workflow.", @@ -1070,6 +1212,180 @@ const docTemplate = `{ } } }, + "/workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/clear": { + "post": { + "description": "Clear the task Instance.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Clear taskInstances", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the wfRunId.", + "name": "wfRunId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the taskId.", + "name": "taskId", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully clear the taskInstances.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstanceReference" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to clear the taskInstances.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, + "/workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/taskTryNum/{taskTyNum}/logs": { + "get": { + "description": "Get the task Logs.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get Task Logs", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the workflowRunId.", + "name": "wfRunId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the task.", + "name": "taskId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the taskTryNum.", + "name": "taskTyNum", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully get the task Logs.", + "schema": { + "$ref": "#/definitions/airflow.InlineResponse200" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the task Logs.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, + "/workflow/{wfId}/workflowRun/{wfRunId}/taskInstances": { + "get": { + "description": "Get the task Logs.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get taskInstances", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfRunId", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully get the taskInstances.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstance" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the taskInstances.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/workflow_template": { "get": { "description": "Get a list of workflow template.", @@ -1221,6 +1537,53 @@ const docTemplate = `{ } }, "definitions": { + "airflow.ImportError": { + "type": "object", + "properties": { + "filename": { + "description": "The filename", + "type": "string" + }, + "import_error_id": { + "description": "The import error ID.", + "type": "integer" + }, + "stack_trace": { + "description": "The full stackstrace..", + "type": "string" + }, + "timestamp": { + "description": "The time when this error was created.", + "type": "string" + } + } + }, + "airflow.ImportErrorCollection": { + "type": "object", + "properties": { + "import_errors": { + "type": "array", + "items": { + "$ref": "#/definitions/airflow.ImportError" + } + }, + "total_entries": { + "description": "Count of objects in the current result set.", + "type": "integer" + } + } + }, + "airflow.InlineResponse200": { + "type": "object", + "properties": { + "content": { + "type": "string" + }, + "continuation_token": { + "type": "string" + } + } + }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse": { "type": "object", "properties": { @@ -1345,6 +1708,32 @@ const docTemplate = `{ } } }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog": { + "type": "object", + "properties": { + "event": { + "type": "string" + }, + "extra": { + "type": "string" + }, + "start_date": { + "type": "string" + }, + "task_id": { + "type": "string" + }, + "task_name": { + "type": "string" + }, + "workflow_id": { + "type": "string" + }, + "workflow_run_id": { + "type": "string" + } + } + }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.GetWorkflowTemplate": { "type": "object", "required": [ @@ -1564,6 +1953,64 @@ const docTemplate = `{ } } }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstance": { + "type": "object", + "properties": { + "duration_date": { + "type": "number" + }, + "end_date": { + "type": "string" + }, + "execution_date": { + "type": "string" + }, + "start_date": { + "type": "string" + }, + "state": { + "type": "string" + }, + "task_id": { + "type": "string" + }, + "task_name": { + "type": "string" + }, + "try_number": { + "type": "integer" + }, + "workflow_id": { + "type": "string" + }, + "workflow_run_id": { + "type": "string" + } + } + }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstanceReference": { + "type": "object", + "properties": { + "execution_date": { + "type": "string" + }, + "task_id": { + "description": "The task ID.", + "type": "string" + }, + "task_name": { + "type": "string" + }, + "workflow_id": { + "description": "The DAG ID.", + "type": "string" + }, + "workflow_run_id": { + "description": "The DAG run ID.", + "type": "string" + } + } + }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow": { "type": "object", "required": [ @@ -1589,6 +2036,57 @@ const docTemplate = `{ } } }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowRun": { + "type": "object", + "properties": { + "conf": { + "type": "object", + "additionalProperties": true + }, + "data_interval_end": { + "type": "string" + }, + "data_interval_start": { + "type": "string" + }, + "duration_date": { + "type": "number" + }, + "end_date": { + "type": "string" + }, + "execution_date": { + "type": "string" + }, + "external_trigger": { + "type": "boolean" + }, + "last_scheduling_decision": { + "type": "string" + }, + "logical_date": { + "type": "string" + }, + "note": { + "type": "string" + }, + "run_type": { + "type": "string" + }, + "start_date": { + "type": "string" + }, + "state": { + "type": "string" + }, + "workflow_id": { + "type": "string" + }, + "workflow_run_id": { + "type": "string" + } + } + }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowTemplate": { "type": "object", "required": [ diff --git a/pkg/api/rest/docs/swagger.json b/pkg/api/rest/docs/swagger.json index 25543f8..1be5554 100644 --- a/pkg/api/rest/docs/swagger.json +++ b/pkg/api/rest/docs/swagger.json @@ -12,6 +12,101 @@ }, "basePath": "/cicada", "paths": { + "/eventlogs": { + "get": { + "description": "Get Eventlog.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get Eventlog", + "operationId": "get-Eventlog", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "query", + "required": true + }, + { + "type": "string", + "description": "ID of the workflow run.", + "name": "wfRunId", + "in": "query" + }, + { + "type": "string", + "description": "ID of the task.", + "name": "taskId", + "in": "query" + } + ], + "responses": { + "200": { + "description": "Successfully get the workflow.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog" + } + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the workflow.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, + "/importErrors": { + "get": { + "description": "Get the importErrors.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get importErrors", + "responses": { + "200": { + "description": "Successfully get the importErrors.", + "schema": { + "$ref": "#/definitions/airflow.ImportErrorCollection" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the importErrors.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/readyz": { "get": { "description": "Check Cicada is ready", @@ -749,6 +844,53 @@ } } }, + "/workflow/{wfId}/runs": { + "get": { + "description": "Get the task Logs.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get workflowRuns", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully get the workflowRuns.", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowRun" + } + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the workflowRuns.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/workflow/{wfId}/task": { "get": { "description": "Get a task list of the workflow.", @@ -1063,6 +1205,180 @@ } } }, + "/workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/clear": { + "post": { + "description": "Clear the task Instance.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Clear taskInstances", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the wfRunId.", + "name": "wfRunId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the taskId.", + "name": "taskId", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully clear the taskInstances.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstanceReference" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to clear the taskInstances.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, + "/workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/taskTryNum/{taskTyNum}/logs": { + "get": { + "description": "Get the task Logs.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get Task Logs", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the workflowRunId.", + "name": "wfRunId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the task.", + "name": "taskId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the taskTryNum.", + "name": "taskTyNum", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully get the task Logs.", + "schema": { + "$ref": "#/definitions/airflow.InlineResponse200" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the task Logs.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, + "/workflow/{wfId}/workflowRun/{wfRunId}/taskInstances": { + "get": { + "description": "Get the task Logs.", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "[Workflow]" + ], + "summary": "Get taskInstances", + "parameters": [ + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfId", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "ID of the workflow.", + "name": "wfRunId", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "Successfully get the taskInstances.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstance" + } + }, + "400": { + "description": "Sent bad request.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + }, + "500": { + "description": "Failed to get the taskInstances.", + "schema": { + "$ref": "#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse" + } + } + } + } + }, "/workflow_template": { "get": { "description": "Get a list of workflow template.", @@ -1214,6 +1530,53 @@ } }, "definitions": { + "airflow.ImportError": { + "type": "object", + "properties": { + "filename": { + "description": "The filename", + "type": "string" + }, + "import_error_id": { + "description": "The import error ID.", + "type": "integer" + }, + "stack_trace": { + "description": "The full stackstrace..", + "type": "string" + }, + "timestamp": { + "description": "The time when this error was created.", + "type": "string" + } + } + }, + "airflow.ImportErrorCollection": { + "type": "object", + "properties": { + "import_errors": { + "type": "array", + "items": { + "$ref": "#/definitions/airflow.ImportError" + } + }, + "total_entries": { + "description": "Count of objects in the current result set.", + "type": "integer" + } + } + }, + "airflow.InlineResponse200": { + "type": "object", + "properties": { + "content": { + "type": "string" + }, + "continuation_token": { + "type": "string" + } + } + }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse": { "type": "object", "properties": { @@ -1338,6 +1701,32 @@ } } }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog": { + "type": "object", + "properties": { + "event": { + "type": "string" + }, + "extra": { + "type": "string" + }, + "start_date": { + "type": "string" + }, + "task_id": { + "type": "string" + }, + "task_name": { + "type": "string" + }, + "workflow_id": { + "type": "string" + }, + "workflow_run_id": { + "type": "string" + } + } + }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.GetWorkflowTemplate": { "type": "object", "required": [ @@ -1557,6 +1946,64 @@ } } }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstance": { + "type": "object", + "properties": { + "duration_date": { + "type": "number" + }, + "end_date": { + "type": "string" + }, + "execution_date": { + "type": "string" + }, + "start_date": { + "type": "string" + }, + "state": { + "type": "string" + }, + "task_id": { + "type": "string" + }, + "task_name": { + "type": "string" + }, + "try_number": { + "type": "integer" + }, + "workflow_id": { + "type": "string" + }, + "workflow_run_id": { + "type": "string" + } + } + }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstanceReference": { + "type": "object", + "properties": { + "execution_date": { + "type": "string" + }, + "task_id": { + "description": "The task ID.", + "type": "string" + }, + "task_name": { + "type": "string" + }, + "workflow_id": { + "description": "The DAG ID.", + "type": "string" + }, + "workflow_run_id": { + "description": "The DAG run ID.", + "type": "string" + } + } + }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow": { "type": "object", "required": [ @@ -1582,6 +2029,57 @@ } } }, + "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowRun": { + "type": "object", + "properties": { + "conf": { + "type": "object", + "additionalProperties": true + }, + "data_interval_end": { + "type": "string" + }, + "data_interval_start": { + "type": "string" + }, + "duration_date": { + "type": "number" + }, + "end_date": { + "type": "string" + }, + "execution_date": { + "type": "string" + }, + "external_trigger": { + "type": "boolean" + }, + "last_scheduling_decision": { + "type": "string" + }, + "logical_date": { + "type": "string" + }, + "note": { + "type": "string" + }, + "run_type": { + "type": "string" + }, + "start_date": { + "type": "string" + }, + "state": { + "type": "string" + }, + "workflow_id": { + "type": "string" + }, + "workflow_run_id": { + "type": "string" + } + } + }, "github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowTemplate": { "type": "object", "required": [ diff --git a/pkg/api/rest/docs/swagger.yaml b/pkg/api/rest/docs/swagger.yaml index b920447..17658a1 100644 --- a/pkg/api/rest/docs/swagger.yaml +++ b/pkg/api/rest/docs/swagger.yaml @@ -1,5 +1,37 @@ basePath: /cicada definitions: + airflow.ImportError: + properties: + filename: + description: The filename + type: string + import_error_id: + description: The import error ID. + type: integer + stack_trace: + description: The full stackstrace.. + type: string + timestamp: + description: The time when this error was created. + type: string + type: object + airflow.ImportErrorCollection: + properties: + import_errors: + items: + $ref: '#/definitions/airflow.ImportError' + type: array + total_entries: + description: Count of objects in the current result set. + type: integer + type: object + airflow.InlineResponse200: + properties: + content: + type: string + continuation_token: + type: string + type: object github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse: properties: error: @@ -82,6 +114,23 @@ definitions: required: - task_groups type: object + github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog: + properties: + event: + type: string + extra: + type: string + start_date: + type: string + task_id: + type: string + task_name: + type: string + workflow_id: + type: string + workflow_run_id: + type: string + type: object github_com_cloud-barista_cm-cicada_pkg_api_rest_model.GetWorkflowTemplate: properties: data: @@ -231,6 +280,45 @@ definitions: - name - tasks type: object + github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstance: + properties: + duration_date: + type: number + end_date: + type: string + execution_date: + type: string + start_date: + type: string + state: + type: string + task_id: + type: string + task_name: + type: string + try_number: + type: integer + workflow_id: + type: string + workflow_run_id: + type: string + type: object + github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstanceReference: + properties: + execution_date: + type: string + task_id: + description: The task ID. + type: string + task_name: + type: string + workflow_id: + description: The DAG ID. + type: string + workflow_run_id: + description: The DAG run ID. + type: string + type: object github_com_cloud-barista_cm-cicada_pkg_api_rest_model.Workflow: properties: created_at: @@ -248,6 +336,40 @@ definitions: - id - name type: object + github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowRun: + properties: + conf: + additionalProperties: true + type: object + data_interval_end: + type: string + data_interval_start: + type: string + duration_date: + type: number + end_date: + type: string + execution_date: + type: string + external_trigger: + type: boolean + last_scheduling_decision: + type: string + logical_date: + type: string + note: + type: string + run_type: + type: string + start_date: + type: string + state: + type: string + workflow_id: + type: string + workflow_run_id: + type: string + type: object github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowTemplate: properties: data: @@ -270,6 +392,69 @@ info: title: CM-Cicada REST API version: latest paths: + /eventlogs: + get: + consumes: + - application/json + description: Get Eventlog. + operationId: get-Eventlog + parameters: + - description: ID of the workflow. + in: query + name: wfId + required: true + type: string + - description: ID of the workflow run. + in: query + name: wfRunId + type: string + - description: ID of the task. + in: query + name: taskId + type: string + produces: + - application/json + responses: + "200": + description: Successfully get the workflow. + schema: + items: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.EventLog' + type: array + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to get the workflow. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: Get Eventlog + tags: + - '[Workflow]' + /importErrors: + get: + consumes: + - application/json + description: Get the importErrors. + produces: + - application/json + responses: + "200": + description: Successfully get the importErrors. + schema: + $ref: '#/definitions/airflow.ImportErrorCollection' + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to get the importErrors. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: Get importErrors + tags: + - '[Workflow]' /readyz: get: consumes: @@ -732,6 +917,37 @@ paths: summary: Run Workflow tags: - '[Workflow]' + /workflow/{wfId}/runs: + get: + consumes: + - application/json + description: Get the task Logs. + parameters: + - description: ID of the workflow. + in: path + name: wfId + required: true + type: string + produces: + - application/json + responses: + "200": + description: Successfully get the workflowRuns. + schema: + items: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.WorkflowRun' + type: array + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to get the workflowRuns. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: Get workflowRuns + tags: + - '[Workflow]' /workflow/{wfId}/task: get: consumes: @@ -943,6 +1159,123 @@ paths: summary: Get Task from Task Group tags: - '[Workflow]' + /workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/clear: + post: + consumes: + - application/json + description: Clear the task Instance. + parameters: + - description: ID of the workflow. + in: path + name: wfId + required: true + type: string + - description: ID of the wfRunId. + in: path + name: wfRunId + required: true + type: string + - description: ID of the taskId. + in: path + name: taskId + required: true + type: string + produces: + - application/json + responses: + "200": + description: Successfully clear the taskInstances. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstanceReference' + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to clear the taskInstances. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: Clear taskInstances + tags: + - '[Workflow]' + /workflow/{wfId}/workflowRun/{wfRunId}/task/{taskId}/taskTryNum/{taskTyNum}/logs: + get: + consumes: + - application/json + description: Get the task Logs. + parameters: + - description: ID of the workflow. + in: path + name: wfId + required: true + type: string + - description: ID of the workflowRunId. + in: path + name: wfRunId + required: true + type: string + - description: ID of the task. + in: path + name: taskId + required: true + type: string + - description: ID of the taskTryNum. + in: path + name: taskTyNum + required: true + type: string + produces: + - application/json + responses: + "200": + description: Successfully get the task Logs. + schema: + $ref: '#/definitions/airflow.InlineResponse200' + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to get the task Logs. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: Get Task Logs + tags: + - '[Workflow]' + /workflow/{wfId}/workflowRun/{wfRunId}/taskInstances: + get: + consumes: + - application/json + description: Get the task Logs. + parameters: + - description: ID of the workflow. + in: path + name: wfId + required: true + type: string + - description: ID of the workflow. + in: path + name: wfRunId + required: true + type: string + produces: + - application/json + responses: + "200": + description: Successfully get the taskInstances. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_model.TaskInstance' + "400": + description: Sent bad request. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + "500": + description: Failed to get the taskInstances. + schema: + $ref: '#/definitions/github_com_cloud-barista_cm-cicada_pkg_api_rest_common.ErrorResponse' + summary: Get taskInstances + tags: + - '[Workflow]' /workflow/name/{wfName}: get: consumes: diff --git a/pkg/api/rest/model/workflow.go b/pkg/api/rest/model/workflow.go index 281ba72..b7ae2f0 100644 --- a/pkg/api/rest/model/workflow.go +++ b/pkg/api/rest/model/workflow.go @@ -92,6 +92,73 @@ type CreateWorkflowReq struct { Data CreateDataReq `gorm:"column:data" json:"data" mapstructure:"data" validate:"required"` } +type Monit struct { + WorkflowID string + WorkflowVersion string + Status string + startTime time.Time + endTime time.Time + Duration time.Time + WorkflowInput string + WorkflowResult string +} + +type WorkflowRun struct { + WorkflowRunID string `json:"workflow_run_id,omitempty"` + WorkflowID *string `json:"workflow_id,omitempty"` + LogicalDate string `json:"logical_date,omitempty"` + ExecutionDate time.Time `json:"execution_date,omitempty"` + StartDate time.Time `json:"start_date,omitempty"` + EndDate time.Time `json:"end_date,omitempty"` + DurationDate float64 `json:"duration_date,omitempty"` + DataIntervalStart time.Time `json:"data_interval_start,omitempty"` + DataIntervalEnd time.Time `json:"data_interval_end,omitempty"` + LastSchedulingDecision time.Time `json:"last_scheduling_decision,omitempty"` + RunType string `json:"run_type,omitempty"` + State string `json:"state,omitempty"` + ExternalTrigger *bool `json:"external_trigger,omitempty"` + Conf map[string]interface{} `json:"conf,omitempty"` + Note string `json:"note,omitempty"` +} + +type TaskInstance struct { + WorkflowRunID string `json:"workflow_run_id,omitempty"` + WorkflowID *string `json:"workflow_id,omitempty"` + TaskID string `json:"task_id,omitempty"` + TaskName string `json:"task_name,omitempty"` + State string `json:"state,omitempty"` + StartDate time.Time `json:"start_date,omitempty"` + EndDate time.Time `json:"end_date,omitempty"` + DurationDate float64 `json:"duration_date"` + ExecutionDate time.Time `json:"execution_date,omitempty"` + TryNumber int `json:"try_number"` +} + +type TaskInstanceReference struct { + // The task ID. + TaskId *string `json:"task_id,omitempty"` + TaskName string `json:"task_name,omitempty"` + // The DAG ID. + WorkflowID *string `json:"workflow_id,omitempty"` + // The DAG run ID. + WorkflowRunID *string `json:"workflow_run_id,omitempty"` + ExecutionDate *string `json:"execution_date,omitempty"` +} + +type TaskLog struct { + Content string `json:"content,omitempty"` +} + +type EventLog struct { + WorkflowRunID string `json:"workflow_run_id,omitempty"` + WorkflowID string `json:"workflow_id"` + TaskID string `json:"task_id"` + TaskName string `json:"task_name"` + Event string `json:"event,omitempty"` + When time.Time `json:"start_date,omitempty"` + Extra string `json:"extra,omitempty"` +} + func (d Data) Value() (driver.Value, error) { return json.Marshal(d) } @@ -120,4 +187,4 @@ func (d *CreateDataReq) Scan(value interface{}) error { return errors.New("Invalid type for CreateDataReq") } return json.Unmarshal(bytes, d) -} +} \ No newline at end of file diff --git a/pkg/api/rest/route/workflow.go b/pkg/api/rest/route/workflow.go index 46a728f..dffbcc4 100644 --- a/pkg/api/rest/route/workflow.go +++ b/pkg/api/rest/route/workflow.go @@ -1,10 +1,11 @@ package route import ( + "strings" + "github.com/cloud-barista/cm-cicada/common" "github.com/cloud-barista/cm-cicada/pkg/api/rest/controller" "github.com/labstack/echo/v4" - "strings" ) func Workflow(e *echo.Echo) { @@ -27,5 +28,10 @@ func Workflow(e *echo.Echo) { e.GET("/"+strings.ToLower(common.ShortModuleName)+"/task_group/:tgId", controller.GetTaskGroupDirectly) e.GET("/"+strings.ToLower(common.ShortModuleName)+"/task/:taskId", controller.GetTaskDirectly) - + e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/workflowRun/:wfRunId/task/:taskId/taskTryNum/:taskTyNum/logs", controller.GetTaskLogs) + e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/runs", controller.GetWorkflowRuns) + e.GET("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/workflowRun/:wfRunId/taskInstances", controller.GetTaskInstances) + e.POST("/"+strings.ToLower(common.ShortModuleName)+"/workflow/:wfId/workflowRun/:wfRunId/task/:taskId/clear", controller.ClearTaskInstances) + e.GET("/"+strings.ToLower(common.ShortModuleName)+"/eventlogs", controller.GetEventLogs) + e.GET("/"+strings.ToLower(common.ShortModuleName)+"/importErrors", controller.GetImportErrors) }