diff --git a/tests/odh/mnist_ray_test.go b/tests/odh/mnist_ray_test.go index 0d74b2a5..c306d3ec 100644 --- a/tests/odh/mnist_ray_test.go +++ b/tests/odh/mnist_ray_test.go @@ -131,7 +131,6 @@ func mnistRay(t *testing.T, numGpus int) { // Fetch created raycluster rayClusterName := "mnisttest" - // Wait until raycluster is up and running rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) test.Expect(err).ToNot(HaveOccurred()) diff --git a/tests/odh/mnist_raytune_hpo_test.go b/tests/odh/mnist_raytune_hpo_test.go index 318c79cc..5e40e4ba 100644 --- a/tests/odh/mnist_raytune_hpo_test.go +++ b/tests/odh/mnist_raytune_hpo_test.go @@ -129,53 +129,12 @@ func mnistRayTuneHpo(t *testing.T, numGpus int) { ContainElement(WithTransform(KueueWorkloadAdmitted, BeTrueBecause("Workload failed to be admitted"))), ), ) - time.Sleep(30 * time.Second) // Fetch created raycluster rayClusterName := "mnisthpotest" rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) test.Expect(err).ToNot(HaveOccurred()) - // Initialise raycluster client to interact with raycluster to get rayjob details using REST-API - dashboardUrl := GetDashboardUrl(test, namespace, rayCluster) - rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true} - rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken) - if err != nil { - test.T().Errorf("%s", err) - } - - jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) - test.Expect(jobID).ToNot(Equal(nil)) - - // Wait for the job to be succeeded or failed - var rayJobStatus string - fmt.Printf("Waiting for job to be Succeeded...\n") - test.Eventually(func() string { - resp, err := rayClient.GetJobDetails(jobID) - test.Expect(err).ToNot(HaveOccurred()) - rayJobStatusVal := resp.Status - if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" { - fmt.Printf("JobStatus : %s\n", rayJobStatusVal) - rayJobStatus = rayJobStatusVal - return rayJobStatus - } - if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" { - fmt.Printf("JobStatus : %s...\n", rayJobStatusVal) - rayJobStatus = rayJobStatusVal - } - return rayJobStatus - }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") - test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") - - // Store job logs in output directory - WriteRayJobAPILogs(test, rayClient, jobID) - - // Fetch created raycluster - rayClusterName := "mnisthpotest" - // Wait until raycluster is up and running - rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) - test.Expect(err).ToNot(HaveOccurred()) - // Initialise raycluster client to interact with raycluster to get rayjob details using REST-API dashboardUrl := GetDashboardUrl(test, namespace, rayCluster) rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true} diff --git a/tests/odh/notebook.go b/tests/odh/notebook.go index 12b8aa51..f09d8312 100644 --- a/tests/odh/notebook.go +++ b/tests/odh/notebook.go @@ -29,8 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/yaml" ) -const recommendedTagAnnotation = "opendatahub.io/workbench-image-recommended" - var notebookResource = schema.GroupVersionResource{Group: "kubeflow.org", Version: "v1", Resource: "notebooks"} type NotebookProps struct { diff --git a/tests/odh/ray_finetune_llm_deepspeed_test.go b/tests/odh/ray_finetune_llm_deepspeed_test.go index 661e5597..ead556ea 100644 --- a/tests/odh/ray_finetune_llm_deepspeed_test.go +++ b/tests/odh/ray_finetune_llm_deepspeed_test.go @@ -30,13 +30,13 @@ import ( ) func TestRayFinetuneLlmDeepspeedDemoLlama_2_7b(t *testing.T) { - rayFinetuneLlmDeepspeed(t, 1, "zero_3_llama_2_7b.json") + rayFinetuneLlmDeepspeed(t, 1, "meta-llama/Llama-2-7b-chat-hf", "zero_3_llama_2_7b.json") } func TestRayFinetuneLlmDeepspeedDemoLlama_31_8b(t *testing.T) { - rayFinetuneLlmDeepspeed(t, 1, "zero_3_offload_optim_param.json") + rayFinetuneLlmDeepspeed(t, 1, "meta-llama/Meta-Llama-3.1-8B", "zero_3_offload_optim_param.json") } -func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string) { +func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelName string, modelConfigFile string) { test := With(t) // Create a namespace @@ -56,21 +56,22 @@ func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string) "import os": "import os,time,sys", "import sys": "!cp /opt/app-root/notebooks/* ./\\n\",\n\t\"!ls", "from codeflare_sdk.cluster.auth import TokenAuthentication": "from codeflare_sdk.cluster.auth import TokenAuthentication\\n\",\n\t\"from codeflare_sdk.job import RayJobClient", - "token = ''": fmt.Sprintf("token = '%s'", userToken), - "server = ''": fmt.Sprintf("server = '%s'", GetOpenShiftApiUrl(test)), - "namespace='ray-finetune-llm-deepspeed'": fmt.Sprintf("namespace='%s'", namespace.Name), - "head_cpus=16": "head_cpus=2", - "head_extended_resource_requests=1": "head_extended_resource_requests=0", - "num_workers=7": "num_workers=1", - "worker_cpu_requests=16": "worker_cpu_requests=4", - "worker_cpu_limits=16": "worker_cpu_limits=4", - "worker_memory_requests=128": "worker_memory_requests=64", - "worker_memory_limits=256": "worker_memory_limits=128", - "head_memory=128": "head_memory=48", - "client = cluster.job_client": "ray_dashboard = cluster.cluster_dashboard_uri()\\n\",\n\t\"header = {\\\"Authorization\\\": \\\"Bearer " + userToken + "\\\"}\\n\",\n\t\"client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\\n", - "--num-devices=8": fmt.Sprintf("--num-devices=%d", numGpus), - "--num-epochs=3": fmt.Sprintf("--num-epochs=%d", 1), - "--ds-config=./deepspeed_configs/zero_3_offload_optim+param.json": fmt.Sprintf("--ds-config=./%s \\\"\\n\",\n\t\" \\\"--lora-config=./lora.json \\\"\\n\",\n\t\" \\\"--as-test", modelConfigFile), + "token = ''": fmt.Sprintf("token = '%s'", userToken), + "server = ''": fmt.Sprintf("server = '%s'", GetOpenShiftApiUrl(test)), + "namespace='ray-finetune-llm-deepspeed'": fmt.Sprintf("namespace='%s'", namespace.Name), + "head_cpus=16": "head_cpus=2", + "head_extended_resource_requests=1": "head_extended_resource_requests=0", + "num_workers=7": "num_workers=1", + "worker_cpu_requests=16": "worker_cpu_requests=4", + "worker_cpu_limits=16": "worker_cpu_limits=4", + "worker_memory_requests=128": "worker_memory_requests=64", + "worker_memory_limits=256": "worker_memory_limits=128", + "head_memory=128": "head_memory=48", + "client = cluster.job_client": "ray_dashboard = cluster.cluster_dashboard_uri()\\n\",\n\t\"header = {\\\"Authorization\\\": \\\"Bearer " + userToken + "\\\"}\\n\",\n\t\"client = RayJobClient(address=ray_dashboard, headers=header, verify=False)\\n", + "--num-devices=8": fmt.Sprintf("--num-devices=%d", numGpus), + "--num-epochs=3": fmt.Sprintf("--num-epochs=%d", 1), + "--model-name=meta-llama/Meta-Llama-3.1-8B": fmt.Sprintf("--model-name=%s", modelName), + "--ds-config=./deepspeed_configs/zero_3_offload_optim_param.json": fmt.Sprintf("--ds-config=./%s \\\"\\n\",\n\t\" \\\"--lora-config=./lora.json \\\"\\n\",\n\t\" \\\"--as-test", modelConfigFile), "--batch-size-per-device=32": "--batch-size-per-device=6", "--eval-batch-size-per-device=32": "--eval-batch-size-per-device=6", "'pip': 'requirements.txt'": "'pip': '/opt/app-root/src/requirements.txt'", @@ -83,7 +84,6 @@ func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string) updatedNotebookContent = strings.Replace(updatedNotebookContent, oldValue, newValue, -1) } updatedNotebook := []byte(updatedNotebookContent) - os.WriteFile("demo.ipynb", updatedNotebook, 0644) // Test configuration jupyterNotebookConfigMapFileName := "ray_finetune_llm_deepspeed.ipynb" @@ -117,8 +117,6 @@ func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string) ), ) - time.Sleep(30 * time.Second) - // Fetch created raycluster rayClusterName := "ray" rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Get(test.Ctx(), rayClusterName, metav1.GetOptions{}) @@ -128,37 +126,44 @@ func rayFinetuneLlmDeepspeed(t *testing.T, numGpus int, modelConfigFile string) dashboardUrl := GetDashboardUrl(test, namespace, rayCluster) rayClusterClientConfig := RayClusterClientConfig{Address: dashboardUrl.String(), Client: nil, InsecureSkipVerify: true} rayClient, err := NewRayClusterClient(rayClusterClientConfig, test.Config().BearerToken) - if err != nil { - test.T().Errorf("%s", err) - } + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to create new raycluster client: %s", err)) + // wait until rayjob exists + test.Eventually(func() []RayJobDetailsResponse { + rayJobs, err := rayClient.GetJobs() + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to fetch ray-jobs : %s", err)) + return *rayJobs + }, TestTimeoutMedium, 1*time.Second).Should(HaveLen(1), "Ray job not found") + + // Get test job-id jobID := GetTestJobId(test, rayClient, dashboardUrl.Host) - test.Expect(jobID).ToNot(Equal(nil)) + test.Expect(jobID).ToNot(BeEmpty()) // Wait for the job to be succeeded or failed var rayJobStatus string - fmt.Printf("Waiting for job to be Succeeded...\n") + test.T().Logf("Waiting for job to be Succeeded...\n") test.Eventually(func() string { resp, err := rayClient.GetJobDetails(jobID) - test.Expect(err).ToNot(HaveOccurred()) + test.Expect(err).ToNot(HaveOccurred(), fmt.Sprintf("Failed to get job details :%s", err)) rayJobStatusVal := resp.Status if rayJobStatusVal == "SUCCEEDED" || rayJobStatusVal == "FAILED" { - fmt.Printf("JobStatus : %s\n", rayJobStatusVal) + test.T().Logf("JobStatus - %s\n", rayJobStatusVal) rayJobStatus = rayJobStatusVal - WriteRayJobAPILogs(test, rayClient, jobID) return rayJobStatus } if rayJobStatus != rayJobStatusVal && rayJobStatusVal != "SUCCEEDED" { - fmt.Printf("JobStatus : %s...\n", rayJobStatusVal) + test.T().Logf("JobStatus - %s...\n", rayJobStatusVal) rayJobStatus = rayJobStatusVal } return rayJobStatus - }, TestTimeoutDouble, 3*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") + }, TestTimeoutDouble, 1*time.Second).Should(Or(Equal("SUCCEEDED"), Equal("FAILED")), "Job did not complete within the expected time") // Store job logs in output directory WriteRayJobAPILogs(test, rayClient, jobID) + + // Assert ray-job status after job execution test.Expect(rayJobStatus).To(Equal("SUCCEEDED"), "RayJob failed !") // Make sure the RayCluster finishes and is deleted - test.Eventually(RayClusters(test, namespace.Name), TestTimeoutMedium). - Should(HaveLen(0)) + test.Eventually(RayClusters(test, namespace.Name), TestTimeoutLong). + Should(BeEmpty()) } diff --git a/tests/odh/support.go b/tests/odh/support.go index 22eb2871..a6232a32 100644 --- a/tests/odh/support.go +++ b/tests/odh/support.go @@ -18,14 +18,11 @@ package odh import ( "embed" - "net/http" "net/url" "os" - . "github.com/onsi/gomega" gomega "github.com/onsi/gomega" "github.com/project-codeflare/codeflare-common/support" - . "github.com/project-codeflare/codeflare-common/support" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" v1 "k8s.io/api/core/v1" ) @@ -49,8 +46,7 @@ func ReadFileExt(t support.Test, fileName string) []byte { func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *rayv1.RayCluster) *url.URL { dashboardName := "ray-dashboard-" + rayCluster.Name - test.T().Logf("Raycluster created : %s\n", rayCluster.Name) - route := GetRoute(test, namespace.Name, dashboardName) + route := support.GetRoute(test, namespace.Name, dashboardName) hostname := route.Status.Ingress[0].Host dashboardUrl, _ := url.Parse("https://" + hostname) test.T().Logf("Ray-dashboard route : %s\n", dashboardUrl.String()) @@ -58,15 +54,9 @@ func GetDashboardUrl(test support.Test, namespace *v1.Namespace, rayCluster *ray return dashboardUrl } -func GetTestJobId(test Test, rayClient RayClusterClient, hostName string) string { - listJobsReq, err := http.NewRequest("GET", "https://"+hostName+"/api/jobs/", nil) - if err != nil { - test.T().Errorf("failed to do get request: %s\n", err) - } - listJobsReq.Header.Add("Authorization", "Bearer "+test.Config().BearerToken) - +func GetTestJobId(test support.Test, rayClient support.RayClusterClient, hostName string) string { allJobsData, err := rayClient.GetJobs() - test.Expect(err).ToNot(HaveOccurred()) + test.Expect(err).ToNot(gomega.HaveOccurred()) jobID := (*allJobsData)[0].SubmissionID if len(*allJobsData) > 0 {