Skip to content

Commit

Permalink
Implement (most of) UpdateModel
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Jun 30, 2024
1 parent 7bacb7f commit ef29a3b
Show file tree
Hide file tree
Showing 19 changed files with 941 additions and 537 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
}
} else {
try {
GetResponse gr = GetResponse.fromXContent(r.parser());
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
if (gr.isExists()) {
try (
XContentParser parser = jsonXContent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLAgen
}
} else {
try {
GetResponse gr = GetResponse.fromXContent(r.parser());
if (gr.isExists()) {
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
if (gr != null && gr.isExists()) {
try (
XContentParser parser = jsonXContent
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, gr.getSourceAsString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ private void handleUpdateDataObjectCompletionStage(
updateListener.onFailure(cause);
} else {
try {
UpdateResponse updateResponse = UpdateResponse.fromXContent(r.parser());
log.info("Connector update result: {}, connector id: {}", updateResponse.getResult(), updateResponse.getId());
UpdateResponse updateResponse = r.parser() == null ? null : UpdateResponse.fromXContent(r.parser());
updateListener.onResponse(updateResponse);
} catch (IOException e) {
updateListener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ private void processResponse(
ActionListener<MLModelGroupGetResponse> wrappedListener
) {
try {
GetResponse r = GetResponse.fromXContent(getDataObjectResponse.parser());
if (r != null && r.isExists()) {
GetResponse gr = getDataObjectResponse.parser() == null ? null : GetResponse.fromXContent(getDataObjectResponse.parser());
if (gr != null && gr.isExists()) {
try (
XContentParser parser = jsonXContent
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, r.getSourceAsString())
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, gr.getSourceAsString())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
MLModelGroup mlModelGroup = MLModelGroup.parse(parser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<MLUpda
}
} else {
try {
GetResponse gr = GetResponse.fromXContent(r.parser());
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
if (gr != null && gr.isExists()) {
try (
XContentParser parser = jsonXContent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.action.models;

import static org.opensearch.common.xcontent.json.JsonXContent.jsonXContent;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
Expand All @@ -23,13 +24,15 @@
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
Expand Down Expand Up @@ -125,55 +128,41 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
.getDataObjectAsync(getDataObjectRequest, client.threadPool().executor(GENERAL_THREAD_POOL))
.whenComplete((r, throwable) -> {
if (throwable == null) {
if (r != null && r.parser().isPresent()) {
try (XContentParser parser = r.parser().get()) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
String algorithmName = "";
Map<String, Object> source = r.source();
if (source != null && source.get(ALGORITHM_FIELD) != null) {
algorithmName = source.get(ALGORITHM_FIELD).toString();
}
MLModel mlModel = MLModel.parse(parser, algorithmName);
if (!TenantAwareHelper
.validateTenantResource(mlFeatureEnabledSetting, tenantId, mlModel.getTenantId(), actionListener)) {
return;
}
Boolean isHidden = (Boolean) r.source().get(IS_HIDDEN_FIELD);
MLModelState mlModelState = mlModel.getModelState();
if (isHidden != null && isHidden) {
if (!isSuperAdmin) {
wrappedListener
.onFailure(
new OpenSearchStatusException(
"User doesn't have privilege to perform this operation on this model",
RestStatus.FORBIDDEN
)
);
} else {
if (isModelNotDeployed(mlModelState)) {
deleteModel(modelId, isHidden, actionListener);
} else {
try {
GetResponse gr = r.parser() == null ? null : GetResponse.fromXContent(r.parser());
if (gr != null && gr.isExists()) {
try (
XContentParser parser = jsonXContent
.createParser(
NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
gr.getSourceAsString()
)
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
String algorithmName = "";
Map<String, Object> source = r.source();
if (source != null && source.get(ALGORITHM_FIELD) != null) {
algorithmName = source.get(ALGORITHM_FIELD).toString();
}
MLModel mlModel = MLModel.parse(parser, algorithmName);
if (!TenantAwareHelper
.validateTenantResource(mlFeatureEnabledSetting, tenantId, mlModel.getTenantId(), actionListener)) {
return;
}
Boolean isHidden = (Boolean) r.source().get(IS_HIDDEN_FIELD);
MLModelState mlModelState = mlModel.getModelState();
if (isHidden != null && isHidden) {
if (!isSuperAdmin) {
wrappedListener
.onFailure(
new OpenSearchStatusException(
"Model cannot be deleted in deploying or deployed state. Try undeploy model first then delete",
RestStatus.BAD_REQUEST
"User doesn't have privilege to perform this operation on this model",
RestStatus.FORBIDDEN
)
);
}
}
} else {
modelAccessControlHelper
.validateModelGroupAccess(user, mlModel.getModelGroupId(), client, ActionListener.wrap(access -> {
if (!access) {
wrappedListener
.onFailure(
new OpenSearchStatusException(
"User doesn't have privilege to perform this operation on this model",
RestStatus.FORBIDDEN
)
);
} else if (isModelNotDeployed(mlModelState)) {
} else {
if (isModelNotDeployed(mlModelState)) {
deleteModel(modelId, isHidden, actionListener);
} else {
wrappedListener
Expand All @@ -184,20 +173,52 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
)
);
}
}, e -> {
log.error(getErrorMessage("Failed to validate Access", modelId, isHidden), e);
wrappedListener.onFailure(e);
}));
}
} else {
modelAccessControlHelper
.validateModelGroupAccess(
user,
mlModel.getModelGroupId(),
client,
ActionListener.wrap(access -> {
if (!access) {
wrappedListener
.onFailure(
new OpenSearchStatusException(
"User doesn't have privilege to perform this operation on this model",
RestStatus.FORBIDDEN
)
);
} else if (isModelNotDeployed(mlModelState)) {
deleteModel(modelId, isHidden, actionListener);
} else {
wrappedListener
.onFailure(
new OpenSearchStatusException(
"Model cannot be deleted in deploying or deployed state. Try undeploy model first then delete",
RestStatus.BAD_REQUEST
)
);
}
}, e -> {
log.error(getErrorMessage("Failed to validate Access", modelId, isHidden), e);
wrappedListener.onFailure(e);
})
);
}
} catch (Exception e) {
log.error("Failed to parse ml model " + r.id(), e);
wrappedListener.onFailure(e);
}
} catch (Exception e) {
log.error("Failed to parse ml model " + r.id(), e);
wrappedListener.onFailure(e);
} else {
// when model metadata is not found, model chunk and controller might still there, delete them here and
// return
// success
// response
deleteModelChunksAndController(wrappedListener, modelId, false, null);
}
} else {
// when model metadata is not found, model chunk and controller might still there, delete them here and return
// success
// response
deleteModelChunksAndController(wrappedListener, modelId, false, null);
} catch (Exception e) {
wrappedListener.onFailure(e);
}
} else {
wrappedListener.onFailure((new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND)));
Expand Down
Loading

0 comments on commit ef29a3b

Please sign in to comment.