Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VirtualCluster] Dashboard supports CRUD of virtual clusters [2/N] #430

Merged
merged 12 commits into from
Dec 30, 2024

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ async def get_all_virtual_clusters(self, req) -> aiohttp.web.Response:
reply, always_print_fields_with_no_presence=True
)
for virtual_cluster_data in data.get("virtualClusterDataList", []):
virtual_cluster_data["virtualClusterId"] = virtual_cluster_data.pop(
"id"
)
virtual_cluster_data["revision"] = int(
virtual_cluster_data.get("revision", 0)
)
Expand Down Expand Up @@ -82,12 +85,12 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response:
reply = await (
self._gcs_virtual_cluster_info_stub.CreateOrUpdateVirtualCluster(request)
)
data = dashboard_utils.message_to_dict(
reply, always_print_fields_with_no_presence=True
)

if reply.status.code == 0:
logger.info("Virtual cluster %s created or updated", virtual_cluster_id)
data = dashboard_utils.message_to_dict(
reply, always_print_fields_with_no_presence=True
)

return dashboard_optional_utils.rest_response(
success=True,
Expand All @@ -106,6 +109,7 @@ async def create_or_update_virtual_cluster(self, req) -> aiohttp.web.Response:
virtual_cluster_id, reply.status.message
),
virtual_cluster_id=virtual_cluster_id,
replica_sets_at_most=data.get("replicaSetsAtMost", {}),
)

@routes.delete("/virtual_clusters/{virtual_cluster_id}")
Expand Down
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
self.version: str = f"{version}+tsan"
elif build_type == BuildType.NIGHTLY:
version_postfix = datetime.today().strftime("%Y%m%d")
version = re.sub(r'dev\d*', f'dev{version_postfix}', version)
version = re.sub(r"dev\d*", f"dev{version_postfix}", version)
self.version: str = version
self.name = f"{self.name}-nightly"
else:
Expand Down
2 changes: 2 additions & 0 deletions src/ray/common/status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ namespace ray {
#define STATUS_CODE_INVALID_ARGUMENT "InvalidArgument"
#define STATUS_CODE_CHANNEL_ERROR "ChannelError"
#define STATUS_CODE_CHANNEL_TIMEOUT_ERROR "ChannelTimeoutError"
#define STATUS_CODE_UNSAFE_TO_REMOVE "UnsafeToRemove"

// not a real status (catch all for codes not known)
#define STATUS_CODE_UNKNOWN "Unknown"
Expand Down Expand Up @@ -107,6 +108,7 @@ const absl::flat_hash_map<StatusCode, std::string> kCodeToStr = {
{StatusCode::InvalidArgument, STATUS_CODE_INVALID_ARGUMENT},
{StatusCode::ChannelError, STATUS_CODE_CHANNEL_ERROR},
{StatusCode::ChannelTimeoutError, STATUS_CODE_CHANNEL_TIMEOUT_ERROR},
{StatusCode::UnsafeToRemove, STATUS_CODE_UNSAFE_TO_REMOVE},
};

const absl::flat_hash_map<std::string, StatusCode> kStrToCode = []() {
Expand Down
8 changes: 8 additions & 0 deletions src/ray/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ enum class StatusCode : char {
ChannelError = 35,
// Indicates that a read or write on a channel (a mutable plasma object) timed out.
ChannelTimeoutError = 36,
// Indicates that it is now unsafe to remove nodes from a (virtual) cluster.
UnsafeToRemove = 37,
// If you add to this list, please also update kCodeToStr in status.cc.
};

Expand Down Expand Up @@ -255,6 +257,10 @@ class RAY_EXPORT Status {
return Status(StatusCode::ChannelTimeoutError, msg);
}

static Status UnsafeToRemove(const std::string &msg) {
return Status(StatusCode::UnsafeToRemove, msg);
}

static StatusCode StringToCode(const std::string &str);

// Returns true iff the status indicates success.
Expand Down Expand Up @@ -309,6 +315,8 @@ class RAY_EXPORT Status {

bool IsChannelTimeoutError() const { return code() == StatusCode::ChannelTimeoutError; }

bool IsUnsafeToRemove() const { return code() == StatusCode::UnsafeToRemove; }

// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
Expand Down
4 changes: 3 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server.ant.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ namespace gcs {
void GcsServer::InitGcsVirtualClusterManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_publisher_);
gcs_virtual_cluster_manager_ = std::make_shared<gcs::GcsVirtualClusterManager>(
*gcs_table_storage_, *gcs_publisher_);
*gcs_table_storage_,
*gcs_publisher_,
cluster_resource_scheduler_->GetClusterResourceManager());
// Initialize by gcs tables data.
gcs_virtual_cluster_manager_->Initialize(gcs_init_data);
// Register service.
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ void GcsServer::InitClusterResourceScheduler() {
[this](scheduling::NodeID node_id, const SchedulingContext *context) {
// Check if the virtual cluster manager exists.
if (gcs_virtual_cluster_manager_ == nullptr ||
context->virtual_cluster_id.empty() ||
context->virtual_cluster_id == kPrimaryClusterID) {
return true;
}
Expand Down
116 changes: 81 additions & 35 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ void VirtualCluster::RemoveNodeInstances(ReplicaInstances replica_instances) {
}
}

Status VirtualCluster::LookupIdleNodeInstances(
const ReplicaSets &replica_sets, ReplicaInstances &replica_instances) const {
bool VirtualCluster::LookupIdleNodeInstances(const ReplicaSets &replica_sets,
ReplicaInstances &replica_instances) const {
bool success = true;
for (const auto &[template_id, replicas] : replica_sets) {
auto &template_node_instances = replica_instances[template_id];
Expand All @@ -115,7 +115,6 @@ Status VirtualCluster::LookupIdleNodeInstances(
success = false;
continue;
}

auto empty_iter = iter->second.find(kEmptyJobClusterId);
if (empty_iter == iter->second.end()) {
success = false;
Expand All @@ -137,13 +136,7 @@ Status VirtualCluster::LookupIdleNodeInstances(
}
}

if (!success) {
// TODO(Shanly): Give a more detailed error message about the demand replica set and
// the idle replica instances.
return Status::OutOfResource("No enough node instances to assign.");
}

return Status::OK();
return success;
}

bool VirtualCluster::MarkNodeInstanceAsDead(const std::string &template_id,
Expand Down Expand Up @@ -237,8 +230,8 @@ Status ExclusiveCluster::CreateJobCluster(const std::string &job_cluster_id,

ReplicaInstances replica_instances_to_add;
// Lookup idle node instances from main cluster based on `replica_sets_to_add`.
auto status = LookupIdleNodeInstances(replica_sets, replica_instances_to_add);
if (!status.ok()) {
auto success = LookupIdleNodeInstances(replica_sets, replica_instances_to_add);
if (!success) {
// TODO(Shanly): Give a more detailed error message about the demand replica set and
// the idle replica instances.
std::ostringstream ostr;
Expand All @@ -265,7 +258,9 @@ std::shared_ptr<JobCluster> ExclusiveCluster::DoCreateJobCluster(
UpdateNodeInstances(std::move(replica_instances_to_add_to_current_cluster),
std::move(replica_instances_to_remove_from_current_cluster));

auto job_cluster = std::make_shared<JobCluster>(job_cluster_id);
// Create a job cluster.
auto job_cluster =
std::make_shared<JobCluster>(job_cluster_id, cluster_resource_manager_);
job_cluster->UpdateNodeInstances(std::move(replica_instances_to_add),
ReplicaInstances());
RAY_CHECK(job_clusters_.emplace(job_cluster_id, job_cluster).second);
Expand Down Expand Up @@ -339,15 +334,31 @@ void ExclusiveCluster::ForeachJobCluster(
///////////////////////// MixedCluster /////////////////////////
bool MixedCluster::IsIdleNodeInstance(const std::string &job_cluster_id,
const gcs::NodeInstance &node_instance) const {
// TODO(Shanly): The job_cluster_id will always be empty in mixed mode although the node
// instance is assigned to one or two jobs, so we need to check the node resources
// usage.
return node_instance.is_dead();
if (node_instance.is_dead()) {
return true;
}
auto node_id =
scheduling::NodeID(NodeID::FromHex(node_instance.node_instance_id()).Binary());
const auto &node_resources = cluster_resource_manager_.GetNodeResources(node_id);
// TODO(Chong-Li): the resource view sync message may lag.
if (node_resources.normal_task_resources.IsEmpty() &&
node_resources.total == node_resources.available) {
return true;
}
return false;
}

bool MixedCluster::InUse() const {
// TODO(Shanly): Check if the virtual cluster still running jobs or placement groups.
return true;
for (const auto &[template_id, job_cluster_instances] : visible_node_instances_) {
for (const auto &[job_cluster_id, node_instances] : job_cluster_instances) {
for (const auto &[node_instance_id, node_instance] : node_instances) {
if (!IsIdleNodeInstance(job_cluster_id, *node_instance)) {
return true;
}
}
}
}
return false;
}

///////////////////////// PrimaryCluster /////////////////////////
Expand Down Expand Up @@ -429,10 +440,11 @@ std::shared_ptr<VirtualCluster> PrimaryCluster::LoadLogicalCluster(
const auto &logical_cluster_id = data.id();
std::shared_ptr<VirtualCluster> logical_cluster;
if (data.mode() == rpc::AllocationMode::EXCLUSIVE) {
logical_cluster =
std::make_shared<ExclusiveCluster>(logical_cluster_id, async_data_flusher_);
logical_cluster = std::make_shared<ExclusiveCluster>(
logical_cluster_id, async_data_flusher_, cluster_resource_manager_);
} else {
logical_cluster = std::make_shared<MixedCluster>(logical_cluster_id);
logical_cluster =
std::make_shared<MixedCluster>(logical_cluster_id, cluster_resource_manager_);
}
RAY_CHECK(logical_clusters_.emplace(logical_cluster_id, logical_cluster).second);

Expand All @@ -451,7 +463,8 @@ std::shared_ptr<VirtualCluster> PrimaryCluster::LoadLogicalCluster(

Status PrimaryCluster::CreateOrUpdateVirtualCluster(
rpc::CreateOrUpdateVirtualClusterRequest request,
CreateOrUpdateVirtualClusterCallback callback) {
CreateOrUpdateVirtualClusterCallback callback,
ReplicaSets *replica_sets_at_most) {
// Calculate the node instances that to be added and to be removed.
ReplicaInstances replica_instances_to_add_to_logical_cluster;
ReplicaInstances replica_instances_to_remove_from_logical_cluster;
Expand All @@ -460,6 +473,25 @@ Status PrimaryCluster::CreateOrUpdateVirtualCluster(
replica_instances_to_add_to_logical_cluster,
replica_instances_to_remove_from_logical_cluster);
if (!status.ok()) {
// Calculate the replica sets that we can fulfill the
// request at most. It can be used as a suggestion to adjust the request if it fails.
if (replica_sets_at_most) {
ReplicaInstances *replica_instances = nullptr;
if (status.IsOutOfResource()) {
replica_instances = &replica_instances_to_add_to_logical_cluster;
} else if (status.IsUnsafeToRemove()) {
replica_instances = &replica_instances_to_remove_from_logical_cluster;
}
if (replica_instances) {
for (const auto &[template_id, job_cluster_instances] : *replica_instances) {
for (const auto &[job_cluster_id, node_instances] : job_cluster_instances) {
if (!node_instances.empty()) {
(*replica_sets_at_most)[template_id] += node_instances.size();
}
}
}
}
}
return status;
}

Expand All @@ -468,10 +500,11 @@ Status PrimaryCluster::CreateOrUpdateVirtualCluster(
// replica_instances_to_remove must be empty as the virtual cluster is a new one.
RAY_CHECK(replica_instances_to_remove_from_logical_cluster.empty());
if (request.mode() == rpc::AllocationMode::EXCLUSIVE) {
logical_cluster = std::make_shared<ExclusiveCluster>(request.virtual_cluster_id(),
async_data_flusher_);
logical_cluster = std::make_shared<ExclusiveCluster>(
request.virtual_cluster_id(), async_data_flusher_, cluster_resource_manager_);
} else {
logical_cluster = std::make_shared<MixedCluster>(request.virtual_cluster_id());
logical_cluster = std::make_shared<MixedCluster>(request.virtual_cluster_id(),
cluster_resource_manager_);
}
logical_clusters_[request.virtual_cluster_id()] = logical_cluster;
}
Expand Down Expand Up @@ -506,18 +539,30 @@ Status PrimaryCluster::DetermineNodeInstanceAdditionsAndRemovals(
ReplicasDifference(logical_cluster->GetReplicaSets(), request.replica_sets());
// Lookup idle node instances from the logical cluster based on
// `replica_sets_to_remove`.
auto status = logical_cluster->LookupIdleNodeInstances(replica_sets_to_remove,
replica_instances_to_remove);
if (!status.ok()) {
return status;
auto success = logical_cluster->LookupIdleNodeInstances(replica_sets_to_remove,
replica_instances_to_remove);
if (!success) {
return Status::UnsafeToRemove(
"No enough nodes to remove from the virtual cluster. The replica sets that gcs "
"can remove "
"at most are shown below. Use it as a suggestion to "
"adjust your request or cluster.");
}
}

auto replica_sets_to_add = ReplicasDifference(
request.replica_sets(),
logical_cluster ? logical_cluster->GetReplicaSets() : ReplicaSets());
// Lookup idle node instances from main cluster based on `replica_sets_to_add`.
return LookupIdleNodeInstances(replica_sets_to_add, replica_instances_to_add);
auto success = LookupIdleNodeInstances(replica_sets_to_add, replica_instances_to_add);
if (!success) {
return Status::OutOfResource(
"No enough nodes to add to the virtual cluster. The replica sets that gcs can "
"add "
"at most are shown below. Use it as a suggestion to "
"adjust your request or cluster.");
}
return Status::OK();
}

bool PrimaryCluster::IsIdleNodeInstance(const std::string &job_cluster_id,
Expand All @@ -529,7 +574,7 @@ bool PrimaryCluster::IsIdleNodeInstance(const std::string &job_cluster_id,
void PrimaryCluster::OnNodeAdd(const rpc::GcsNodeInfo &node) {
const auto &template_id = node.node_type_name();
auto node_instance_id = NodeID::FromBinary(node.node_id()).Hex();
auto node_instance = std::make_shared<gcs::NodeInstance>();
auto node_instance = std::make_shared<gcs::NodeInstance>(node_instance_id);
node_instance->set_template_id(template_id);
node_instance->set_hostname(node.node_manager_hostname());
node_instance->set_is_dead(false);
Expand Down Expand Up @@ -598,14 +643,15 @@ Status PrimaryCluster::RemoveLogicalCluster(const std::string &logical_cluster_i
}

// Check if the virtual cluster is in use.
ReplicaInstances in_use_instances;
if (logical_cluster->InUse()) {
std::ostringstream ostr;
ostr << "The virtual cluster " << logical_cluster_id
<< " can not be removed as it still in use.";
<< " can not be removed as it is still in use. ";
auto message = ostr.str();
RAY_LOG(ERROR) << message;
// TODO(Shanly): build a new status.
return Status::InvalidArgument(message);

Copy link
Collaborator

@wumuzi520 wumuzi520 Dec 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Status::UnsafeToRemove();

return Status::UnsafeToRemove(message);
}

const auto &replica_instances_to_remove = logical_cluster->GetVisibleNodeInstances();
Expand Down
Loading