diff --git a/python/ray/dashboard/modules/virtual_cluster/__init__.py b/python/ray/dashboard/modules/virtual_cluster/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py new file mode 100644 index 000000000000..1867a71fdb9b --- /dev/null +++ b/python/ray/dashboard/modules/virtual_cluster/virtual_cluster_head.py @@ -0,0 +1,139 @@ +import logging + +import aiohttp.web + +import ray.dashboard.optional_utils as dashboard_optional_utils +import ray.dashboard.utils as dashboard_utils +from ray.core.generated import gcs_service_pb2_grpc +from ray.core.generated.gcs_pb2 import AllocationMode +from ray.core.generated.gcs_service_pb2 import ( + CreateOrUpdateVirtualClusterRequest, + GetAllVirtualClustersRequest, + RemoveVirtualClusterRequest, +) + +logger = logging.getLogger(__name__) +routes = dashboard_optional_utils.DashboardHeadRouteTable + + +class VirtualClusterHead(dashboard_utils.DashboardHeadModule): + def __init__(self, dashboard_head): + super().__init__(dashboard_head) + + self._gcs_virtual_cluster_info_stub = ( + gcs_service_pb2_grpc.VirtualClusterInfoGcsServiceStub( + dashboard_head.aiogrpc_gcs_channel + ) + ) + + @routes.get("/virtual_clusters") + @dashboard_optional_utils.aiohttp_cache(10) + async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response: + reply = await self._gcs_virtual_cluster_info_stub.GetAllVirtualClusters( + GetAllVirtualClustersRequest() + ) + + if reply.status.code == 0: + data = dashboard_utils.message_to_dict( + reply, always_print_fields_with_no_presence=True + ) + for virtual_cluster_data in data.get("virtualClusterDataList", []): + virtual_cluster_data["revision"] = int( + virtual_cluster_data.get("revision", 0) + ) + virtual_cluster_data["allocationMode"] = str( + virtual_cluster_data.pop("mode", "mixed") + ).lower() + + return dashboard_optional_utils.rest_response( + success=True, + message="All virtual clusters fetched.", + virtual_clusters=data.get("virtualClusterDataList", []), + ) + else: + logger.info("Failed to get all virtual clusters") + return dashboard_optional_utils.rest_response( + success=False, + message="Failed to get all virtual clusters: {}".format( + reply.status.message + ), + ) + + @routes.post("/virtual_clusters") + async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response: + virtual_cluster_info_json = await req.json() + logger.info("POST /virtual_clusters %s", virtual_cluster_info_json) + + virtual_cluster_info = dict(virtual_cluster_info_json) + virtual_cluster_id = virtual_cluster_info["virtualClusterId"] + allocation_mode = AllocationMode.MIXED + if ( + str(virtual_cluster_info.get("allocationMode", "mixed")).lower() + == "exclusive" + ): + allocation_mode = AllocationMode.EXCLUSIVE + + request = CreateOrUpdateVirtualClusterRequest( + virtual_cluster_id=virtual_cluster_id, + mode=allocation_mode, + replica_sets=virtual_cluster_info.get("replicaSets", {}), + revision=int(virtual_cluster_info.get("revision", 0)), + ) + reply = await ( + self._gcs_virtual_cluster_info_stub.CreateOrUpdateVirtualCluster(request) + ) + + if reply.status.code == 0: + logger.info("Virtual cluster %s created or updated", virtual_cluster_id) + data = dashboard_utils.message_to_dict( + reply, always_print_fields_with_no_presence=True + ) + + return dashboard_optional_utils.rest_response( + success=True, + message="Virtual cluster created or updated.", + virtual_cluster_id=virtual_cluster_id, + revision=int(data.get("revision", 0)), + node_instances=data.get("nodeInstances", {}), + ) + else: + logger.info( + "Failed to create or update virtual cluster %s", virtual_cluster_id + ) + return dashboard_optional_utils.rest_response( + success=False, + message="Failed to create or update virtual cluster {}: {}".format( + virtual_cluster_id, reply.status.message + ), + virtual_cluster_id=virtual_cluster_id, + ) + + @routes.delete("/virtual_clusters/{virtual_cluster_id}") + async def remove_virtual_cluster(self, req) -> aiohttp.web.Response: + virtual_cluster_id = req.match_info.get("virtual_cluster_id") + request = RemoveVirtualClusterRequest(virtual_cluster_id=virtual_cluster_id) + reply = await self._gcs_virtual_cluster_info_stub.RemoveVirtualCluster(request) + + if reply.status.code == 0: + logger.info("Virtual cluster %s removed", virtual_cluster_id) + return dashboard_optional_utils.rest_response( + success=True, + message=f"Virtual cluster {virtual_cluster_id} removed.", + virtual_cluster_id=virtual_cluster_id, + ) + else: + logger.info("Failed to remove virtual cluster %s", virtual_cluster_id) + return dashboard_optional_utils.rest_response( + success=False, + message="Failed to remove virtual cluster {}: {}".format( + virtual_cluster_id, reply.status.message + ), + virtual_cluster_id=virtual_cluster_id, + ) + + async def run(self, server): + pass + + @staticmethod + def is_minimal_module(): + return False diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index 11de7ab5c9e2..b740dbd768fb 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -195,7 +195,7 @@ std::string VirtualCluster::DebugString() const { Status ExclusiveCluster::CreateJobCluster(const std::string &job_name, ReplicaSets replica_sets, CreateOrUpdateVirtualClusterCallback callback) { - if (GetMode() != rpc::AllocationMode::Exclusive) { + if (GetMode() != rpc::AllocationMode::EXCLUSIVE) { std::ostringstream ostr; ostr << "The job cluster can only be created in exclusive mode, virtual_cluster_id: " << GetID() << ", job_name: " << job_name; @@ -246,7 +246,7 @@ Status ExclusiveCluster::CreateJobCluster(const std::string &job_name, Status ExclusiveCluster::RemoveJobCluster(const std::string &job_name, RemoveVirtualClusterCallback callback) { - if (GetMode() != rpc::AllocationMode::Exclusive) { + if (GetMode() != rpc::AllocationMode::EXCLUSIVE) { std::ostringstream ostr; ostr << "The job cluster can only be removed in exclusive mode, virtual_cluster_id: " << GetID() << ", job_name: " << job_name; @@ -298,7 +298,7 @@ bool ExclusiveCluster::InUse() const { return !job_clusters_.empty(); } bool ExclusiveCluster::IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const { - RAY_CHECK(GetMode() == rpc::AllocationMode::Exclusive); + RAY_CHECK(GetMode() == rpc::AllocationMode::EXCLUSIVE); return job_cluster_id == kEmptyJobClusterId; } @@ -341,7 +341,7 @@ Status PrimaryCluster::CreateOrUpdateVirtualCluster( if (logical_cluster == nullptr) { // replica_instances_to_remove must be empty as the virtual cluster is a new one. RAY_CHECK(replica_instances_to_remove_from_logical_cluster.empty()); - if (request.mode() == rpc::AllocationMode::Exclusive) { + if (request.mode() == rpc::AllocationMode::EXCLUSIVE) { logical_cluster = std::make_shared(request.virtual_cluster_id(), async_data_flusher_); } else { @@ -396,7 +396,7 @@ Status PrimaryCluster::DetermineNodeInstanceAdditionsAndRemovals( bool PrimaryCluster::IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const { - RAY_CHECK(GetMode() == rpc::AllocationMode::Exclusive); + RAY_CHECK(GetMode() == rpc::AllocationMode::EXCLUSIVE); return job_cluster_id == kEmptyJobClusterId; } diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h index 1e21ac0206b4..525ce7793750 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h @@ -197,7 +197,7 @@ class ExclusiveCluster : public VirtualCluster { : VirtualCluster(id), async_data_flusher_(async_data_flusher) {} const std::string &GetID() const override { return id_; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::Exclusive; } + rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::EXCLUSIVE; } /// Create a job cluster. /// @@ -245,7 +245,7 @@ class MixedCluster : public VirtualCluster { MixedCluster &operator=(const MixedCluster &) = delete; const std::string &GetID() const override { return id_; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::Mixed; } + rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::MIXED; } /// Check if the virtual cluster is in use. /// @@ -269,7 +269,7 @@ class PrimaryCluster : public ExclusiveCluster { PrimaryCluster &operator=(const PrimaryCluster &) = delete; const std::string &GetID() const override { return kPrimaryClusterID; } - rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::Exclusive; } + rpc::AllocationMode GetMode() const override { return rpc::AllocationMode::EXCLUSIVE; } /// Create or update a new virtual cluster. /// diff --git a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc index e8859ccc28b5..a9d7a698e729 100644 --- a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc @@ -146,7 +146,7 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { { rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id("virtual_cluster_id_0"); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, 5}); request.mutable_replica_sets()->insert({template_id_1, 10}); @@ -198,7 +198,7 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { // Create virtual_cluster_id_1 and check that the status is ok. rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id("virtual_cluster_id_1"); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, node_count_per_template - 5}); request.mutable_replica_sets()->insert({template_id_1, node_count_per_template - 10}); @@ -246,7 +246,7 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { // Create virtual_cluster_id_2 and check that the status is succeed. rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id("virtual_cluster_id_2"); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, 0}); request.mutable_replica_sets()->insert({template_id_1, 0}); @@ -272,7 +272,7 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { // Create virtual_cluster_id_3 and check that the status is failed. rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id("virtual_cluster_id_3"); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, 1}); request.mutable_replica_sets()->insert({template_id_1, 0}); @@ -539,7 +539,7 @@ TEST_F(PrimaryClusterTest, RemoveLogicalCluster) { { rpc::CreateOrUpdateVirtualClusterRequest request; request.set_virtual_cluster_id(virtual_cluster_id_0); - request.set_mode(rpc::AllocationMode::Exclusive); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); request.set_revision(0); request.mutable_replica_sets()->insert({template_id_0, 5}); request.mutable_replica_sets()->insert({template_id_1, 10}); diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 4082960221a6..3c7bdf9cbffd 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -711,10 +711,11 @@ message JobTableData { /////////////////////////////////////////////////////////////////////////////// enum AllocationMode { + UNKNOWN = 0; // A single node can carray tasks for multiple jobs. - Mixed = 0; + MIXED = 1; // A single node can only carray tasks for one job. - Exclusive = 1; + EXCLUSIVE = 2; } message NodeInstance {