Skip to content

Commit

Permalink
fix undeploy issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Ubuntu committed Jan 11, 2025
1 parent 6726a3a commit 7a13fc8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 3 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ buildscript {
ext {
opensearch_group = "org.opensearch"
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
opensearch_version = System.getProperty("opensearch.version", "2.19.0-SNAPSHOT")
opensearch_version = System.getProperty("opensearch.version", "2.18.0-SNAPSHOT")
buildVersionQualifier = System.getProperty("build.version_qualifier", "")

// 2.0.0-rc1-SNAPSHOT -> 2.0.0.0-rc1-SNAPSHOT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@

import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;

import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -32,9 +36,11 @@
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.ml.cluster.DiscoveryNodeHelper;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.model.MLModelState;
import org.opensearch.ml.common.transport.deploy.MLDeployModelRequest;
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelAction;
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesRequest;
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelNodesResponse;
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsAction;
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsRequest;
import org.opensearch.ml.common.transport.undeploy.MLUndeployModelsResponse;
Expand All @@ -51,6 +57,7 @@
import org.opensearch.transport.TransportService;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;

import lombok.extern.log4j.Log4j2;

Expand Down Expand Up @@ -156,11 +163,48 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLUnde
private void undeployModels(String[] targetNodeIds, String[] modelIds, ActionListener<MLUndeployModelsResponse> listener) {
MLUndeployModelNodesRequest mlUndeployModelNodesRequest = new MLUndeployModelNodesRequest(targetNodeIds, modelIds);

client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(r -> {
listener.onResponse(new MLUndeployModelsResponse(r));
client.execute(MLUndeployModelAction.INSTANCE, mlUndeployModelNodesRequest, ActionListener.wrap(response -> {
if (response.getNodes().isEmpty()) {
bulkSetModelIndexToUndeploy(modelIds, listener, response);
return;
}
listener.onResponse(new MLUndeployModelsResponse(response));
}, listener::onFailure));
}

private void bulkSetModelIndexToUndeploy(
String[] modelIds,
ActionListener<MLUndeployModelsResponse> listener,
MLUndeployModelNodesResponse response
) {
BulkRequest bulkUpdateRequest = new BulkRequest();
for (String modelId : modelIds) {
UpdateRequest updateRequest = new UpdateRequest();
Instant now = Instant.now();
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put(MLModel.MODEL_STATE_FIELD, MLModelState.UNDEPLOYED.name());

builder.put(MLModel.PLANNING_WORKER_NODES_FIELD, List.of());
builder.put(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD, 0);

builder.put(MLModel.LAST_UPDATED_TIME_FIELD, now.toEpochMilli());
builder.put(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD, 0);
updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build());
bulkUpdateRequest.add(updateRequest);
}

bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
log.info("No nodes service: {}", modelIds.toString());

client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> {
log.debug("Successfully set modelIds to UNDEPLOY in index");
listener.onResponse(new MLUndeployModelsResponse(response));
}, e -> {
log.error("Failed to set modelIds to UNDEPLOY in index", e);
listener.onFailure(e);
}));
}

private void validateAccess(String modelId, ActionListener<Boolean> listener) {
User user = RestActionUtils.getUserContext(client);
boolean isSuperAdmin = isSuperAdminUserWrapper(clusterService, client);
Expand Down

0 comments on commit 7a13fc8

Please sign in to comment.