diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go index 19e8a9724..0782e410c 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator.go @@ -15,6 +15,7 @@ import ( pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" commonOp "github.com/kubeflow/common/pkg/apis/common/v1" v1 "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -104,8 +105,11 @@ func GetMPIPhaseInfo(currentCondition commonOp.JobCondition, occurredAt time.Tim } // GetLogs will return the logs for kubeflow job -func GetLogs(taskType string, name string, namespace string, hasMaster bool, +func GetLogs(taskType string, objectMeta meta_v1.ObjectMeta, hasMaster bool, workersCount int32, psReplicasCount int32, chiefReplicasCount int32) ([]*core.TaskLog, error) { + name := objectMeta.Name + namespace := objectMeta.Namespace + taskLogs := make([]*core.TaskLog, 0, 10) logPlugin, err := logs.InitializeLogPlugins(logs.GetLogConfig()) @@ -118,12 +122,23 @@ func GetLogs(taskType string, name string, namespace string, hasMaster bool, return nil, nil } + // We use the creation timestamp of the Kubeflow Job as a proxy for the start time of the pods + startTime := objectMeta.CreationTimestamp.Time.Unix() + // Don't have a good mechanism for this yet, but approximating with time.Now for now + finishTime := time.Now().Unix() + RFC3999StartTime := time.Unix(startTime, 0).Format(time.RFC3339) + RFC3999FinishTime := time.Unix(finishTime, 0).Format(time.RFC3339) + if taskType == PytorchTaskType && hasMaster { masterTaskLog, masterErr := logPlugin.GetTaskLogs( tasklog.Input{ - PodName: name + "-master-0", - Namespace: namespace, - LogName: "master", + PodName: name + "-master-0", + Namespace: namespace, + LogName: "master", + PodRFC3339StartTime: RFC3999StartTime, + PodRFC3339FinishTime: RFC3999FinishTime, + PodUnixStartTime: startTime, + PodUnixFinishTime: finishTime, }, ) if masterErr != nil { @@ -135,8 +150,12 @@ func GetLogs(taskType string, name string, namespace string, hasMaster bool, // get all workers log for workerIndex := int32(0); workerIndex < workersCount; workerIndex++ { workerLog, err := logPlugin.GetTaskLogs(tasklog.Input{ - PodName: name + fmt.Sprintf("-worker-%d", workerIndex), - Namespace: namespace, + PodName: name + fmt.Sprintf("-worker-%d", workerIndex), + Namespace: namespace, + PodRFC3339StartTime: RFC3999StartTime, + PodRFC3339FinishTime: RFC3999FinishTime, + PodUnixStartTime: startTime, + PodUnixFinishTime: finishTime, }) if err != nil { return nil, err diff --git a/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go b/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go index 96c4bcd87..75f3f35a5 100644 --- a/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go +++ b/go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go @@ -14,6 +14,7 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestExtractMPICurrentCondition(t *testing.T) { @@ -167,18 +168,31 @@ func TestGetLogs(t *testing.T) { workers := int32(1) launcher := int32(1) - jobLogs, err := GetLogs(MPITaskType, "test", "mpi-namespace", false, workers, launcher, 0) + mpiJobObjectMeta := meta_v1.ObjectMeta{ + Name: "test", + Namespace: "mpi-namespace", + } + jobLogs, err := GetLogs(MPITaskType, mpiJobObjectMeta, false, workers, launcher, 0) assert.NoError(t, err) assert.Equal(t, 1, len(jobLogs)) assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=mpi-namespace", "mpi-namespace", "test"), jobLogs[0].Uri) - jobLogs, err = GetLogs(PytorchTaskType, "test", "pytorch-namespace", true, workers, launcher, 0) + pytorchJobObjectMeta := meta_v1.ObjectMeta{ + Name: "test", + Namespace: "pytorch-namespace", + } + jobLogs, err = GetLogs(PytorchTaskType, pytorchJobObjectMeta, true, workers, launcher, 0) assert.NoError(t, err) assert.Equal(t, 2, len(jobLogs)) assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-master-0/pod?namespace=pytorch-namespace", "pytorch-namespace", "test"), jobLogs[0].Uri) assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=pytorch-namespace", "pytorch-namespace", "test"), jobLogs[1].Uri) - jobLogs, err = GetLogs(TensorflowTaskType, "test", "tensorflow-namespace", false, workers, launcher, 1) + tensorflowJobObjectMeta := meta_v1.ObjectMeta{ + Name: "test", + Namespace: "tensorflow-namespace", + } + + jobLogs, err = GetLogs(TensorflowTaskType, tensorflowJobObjectMeta, false, workers, launcher, 1) assert.NoError(t, err) assert.Equal(t, 3, len(jobLogs)) assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=tensorflow-namespace", "tensorflow-namespace", "test"), jobLogs[0].Uri) @@ -187,6 +201,26 @@ func TestGetLogs(t *testing.T) { } +func TestGetLogsTemplateUri(t *testing.T) { + assert.NoError(t, logs.SetLogConfig(&logs.LogConfig{ + IsStackDriverEnabled: true, + StackDriverTemplateURI: "https://console.cloud.google.com/logs/query;query=resource.labels.pod_name={{.podName}}×tamp>{{.podRFC3339StartTime}}", + })) + + pytorchJobObjectMeta := meta_v1.ObjectMeta{ + Name: "test", + Namespace: "pytorch-namespace", + CreationTimestamp: meta_v1.Time{ + Time: time.Date(2022, time.January, 1, 12, 0, 0, 0, time.UTC), + }, + } + jobLogs, err := GetLogs(PytorchTaskType, pytorchJobObjectMeta, true, 1, 0, 0) + assert.NoError(t, err) + assert.Equal(t, 2, len(jobLogs)) + assert.Equal(t, fmt.Sprintf("https://console.cloud.google.com/logs/query;query=resource.labels.pod_name=%s-master-0×tamp>%s", "test", "2022-01-01T12:00:00Z"), jobLogs[0].Uri) + assert.Equal(t, fmt.Sprintf("https://console.cloud.google.com/logs/query;query=resource.labels.pod_name=%s-worker-0×tamp>%s", "test", "2022-01-01T12:00:00Z"), jobLogs[1].Uri) +} + func dummyPodSpec() v1.PodSpec { return v1.PodSpec{ Containers: []v1.Container{ diff --git a/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go b/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go index e9e1f6037..5f8dc0aa8 100644 --- a/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go +++ b/go/tasks/plugins/k8s/kfoperators/mpi/mpi.go @@ -204,7 +204,7 @@ func (mpiOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginContext numWorkers = app.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeWorker].Replicas numLauncherReplicas = app.Spec.MPIReplicaSpecs[kubeflowv1.MPIJobReplicaTypeLauncher].Replicas - taskLogs, err := common.GetLogs(common.MPITaskType, app.Name, app.Namespace, false, + taskLogs, err := common.GetLogs(common.MPITaskType, app.ObjectMeta, false, *numWorkers, *numLauncherReplicas, 0) if err != nil { return pluginsCore.PhaseInfoUndefined, err diff --git a/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go b/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go index 9ec11f5da..a799f9398 100644 --- a/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go +++ b/go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go @@ -389,7 +389,7 @@ func TestGetLogs(t *testing.T) { mpiResourceHandler := mpiOperatorResourceHandler{} mpiJob := dummyMPIJobResource(mpiResourceHandler, workers, launcher, slots, mpiOp.JobRunning) - jobLogs, err := common.GetLogs(common.MPITaskType, mpiJob.Name, mpiJob.Namespace, false, workers, launcher, 0) + jobLogs, err := common.GetLogs(common.MPITaskType, mpiJob.ObjectMeta, false, workers, launcher, 0) assert.NoError(t, err) assert.Equal(t, 2, len(jobLogs)) assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=mpi-namespace", jobNamespace, jobName), jobLogs[0].Uri) diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go index 6b0bc61a7..7a6d1d2fd 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go @@ -226,7 +226,7 @@ func (pytorchOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginCont workersCount := app.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas - taskLogs, err := common.GetLogs(common.PytorchTaskType, app.Name, app.Namespace, hasMaster, *workersCount, 0, 0) + taskLogs, err := common.GetLogs(common.PytorchTaskType, app.ObjectMeta, hasMaster, *workersCount, 0, 0) if err != nil { return pluginsCore.PhaseInfoUndefined, err } diff --git a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go index 27fa4c869..f2c918d79 100644 --- a/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go +++ b/go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go @@ -421,7 +421,7 @@ func TestGetLogs(t *testing.T) { pytorchResourceHandler := pytorchOperatorResourceHandler{} pytorchJob := dummyPytorchJobResource(pytorchResourceHandler, workers, commonOp.JobRunning) - jobLogs, err := common.GetLogs(common.PytorchTaskType, pytorchJob.Name, pytorchJob.Namespace, hasMaster, workers, 0, 0) + jobLogs, err := common.GetLogs(common.PytorchTaskType, pytorchJob.ObjectMeta, hasMaster, workers, 0, 0) assert.NoError(t, err) assert.Equal(t, 3, len(jobLogs)) assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-master-0/pod?namespace=pytorch-namespace", jobNamespace, jobName), jobLogs[0].Uri) @@ -440,7 +440,7 @@ func TestGetLogsElastic(t *testing.T) { pytorchResourceHandler := pytorchOperatorResourceHandler{} pytorchJob := dummyPytorchJobResource(pytorchResourceHandler, workers, commonOp.JobRunning) - jobLogs, err := common.GetLogs(common.PytorchTaskType, pytorchJob.Name, pytorchJob.Namespace, hasMaster, workers, 0, 0) + jobLogs, err := common.GetLogs(common.PytorchTaskType, pytorchJob.ObjectMeta, hasMaster, workers, 0, 0) assert.NoError(t, err) assert.Equal(t, 2, len(jobLogs)) assert.Equal(t, fmt.Sprintf("k8s.com/#!/log/%s/%s-worker-0/pod?namespace=pytorch-namespace", jobNamespace, jobName), jobLogs[0].Uri) diff --git a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go index ea2930d6f..5cc77648f 100644 --- a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go +++ b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go @@ -203,7 +203,7 @@ func (tensorflowOperatorResourceHandler) GetTaskPhase(_ context.Context, pluginC psReplicasCount := app.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypePS].Replicas chiefCount := app.Spec.TFReplicaSpecs[kubeflowv1.TFJobReplicaTypeChief].Replicas - taskLogs, err := common.GetLogs(common.TensorflowTaskType, app.Name, app.Namespace, false, + taskLogs, err := common.GetLogs(common.TensorflowTaskType, app.ObjectMeta, false, *workersCount, *psReplicasCount, *chiefCount) if err != nil { return pluginsCore.PhaseInfoUndefined, err diff --git a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go index 4e2bc1388..f2f7fa493 100644 --- a/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go +++ b/go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow_test.go @@ -373,7 +373,7 @@ func TestGetLogs(t *testing.T) { tensorflowResourceHandler := tensorflowOperatorResourceHandler{} tensorFlowJob := dummyTensorFlowJobResource(tensorflowResourceHandler, workers, psReplicas, chiefReplicas, commonOp.JobRunning) - jobLogs, err := common.GetLogs(common.TensorflowTaskType, tensorFlowJob.Name, tensorFlowJob.Namespace, false, + jobLogs, err := common.GetLogs(common.TensorflowTaskType, tensorFlowJob.ObjectMeta, false, workers, psReplicas, chiefReplicas) assert.NoError(t, err) assert.Equal(t, 4, len(jobLogs))