Skip to content

Commit

Permalink
add IMMEDIATE refresh policy (opensearch-project#2541)
Browse files Browse the repository at this point in the history
* add IMMEDIATE refresh policy

Signed-off-by: Yaliang Wu <[email protected]>

* add refresh policy to bulk request

Signed-off-by: Yaliang Wu <[email protected]>

* run spotlessApply

Signed-off-by: Yaliang Wu <[email protected]>

---------

Signed-off-by: Yaliang Wu <[email protected]>
  • Loading branch information
ylwu-amzn authored Jun 13, 2024
1 parent f28bb74 commit 4c04854
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.engine.algorithms.metrics_correlation;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.index.query.QueryBuilders.termQuery;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_GROUP_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_GROUP_INDEX_MAPPING;
Expand Down Expand Up @@ -268,6 +269,7 @@ void registerModel(ActionListener<MLRegisterModelResponse> listener) throws Inte
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
modelGroup.toXContent(builder, ToXContent.EMPTY_PARAMS);
createModelGroupRequest.source(builder);
createModelGroupRequest.setRefreshPolicy(IMMEDIATE);
client.index(createModelGroupRequest, ActionListener.runBefore(ActionListener.wrap(r -> {
client.execute(MLRegisterModelAction.INSTANCE, registerRequest, ActionListener.wrap(listener::onResponse, e -> {
log.error("Failed to Register Model", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.engine.memory;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_MESSAGE_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_MEMORY_META_INDEX;

Expand Down Expand Up @@ -84,7 +85,7 @@ public void save(String id, Message message) {
public void save(String id, Message message, ActionListener listener) {
mlIndicesHandler.initMemoryMessageIndex(ActionListener.wrap(created -> {
if (created) {
IndexRequest indexRequest = new IndexRequest(memoryMessageIndexName);
IndexRequest indexRequest = new IndexRequest(memoryMessageIndexName).setRefreshPolicy(IMMEDIATE);
ConversationIndexMessage conversationIndexMessage = (ConversationIndexMessage) message;
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
conversationIndexMessage.toXContent(builder, ToXContent.EMPTY_PARAMS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.action.agents;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.ml.common.CommonValue.ML_AGENT_INDEX;
import static org.opensearch.ml.utils.MLNodeUtils.createXContentParserFromRegistry;
Expand Down Expand Up @@ -80,7 +81,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
);
} else {
// If the agent is not hidden or if the user is a super admin, proceed with deletion
DeleteRequest deleteRequest = new DeleteRequest(ML_AGENT_INDEX, agentId);
DeleteRequest deleteRequest = new DeleteRequest(ML_AGENT_INDEX, agentId).setRefreshPolicy(IMMEDIATE);
client.delete(deleteRequest, ActionListener.wrap(deleteResponse -> {
log.debug("Completed Delete Agent Request, agent id:{} deleted", agentId);
actionListener.onResponse(deleteResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.action.agents;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.ml.common.CommonValue.ML_AGENT_INDEX;

import java.time.Instant;
Expand Down Expand Up @@ -70,7 +71,7 @@ private void registerAgent(MLAgent agent, ActionListener<MLRegisterAgentResponse
mlIndicesHandler.initMLAgentIndex(ActionListener.wrap(result -> {
if (result) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
IndexRequest indexRequest = new IndexRequest(ML_AGENT_INDEX);
IndexRequest indexRequest = new IndexRequest(ML_AGENT_INDEX).setRefreshPolicy(IMMEDIATE);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
mlAgent.toXContent(builder, ToXContent.EMPTY_PARAMS);
indexRequest.source(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.action.connector;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.ml.common.CommonValue.ML_CONNECTOR_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;

Expand Down Expand Up @@ -66,7 +67,7 @@ public DeleteConnectorTransportAction(
protected void doExecute(Task task, ActionRequest request, ActionListener<DeleteResponse> actionListener) {
MLConnectorDeleteRequest mlConnectorDeleteRequest = MLConnectorDeleteRequest.fromActionRequest(request);
String connectorId = mlConnectorDeleteRequest.getConnectorId();
DeleteRequest deleteRequest = new DeleteRequest(ML_CONNECTOR_INDEX, connectorId);
DeleteRequest deleteRequest = new DeleteRequest(ML_CONNECTOR_INDEX, connectorId).setRefreshPolicy(IMMEDIATE);
connectorAccessControlHelper.validateConnectorAccess(client, connectorId, ActionListener.wrap(x -> {
if (Boolean.TRUE.equals(x)) {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -93,6 +94,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Update
connector.update(mlUpdateConnectorAction.getUpdateContent(), mlEngine::encrypt);
connector.validateConnectorURL(trustedConnectorEndpointsRegex);
UpdateRequest updateRequest = new UpdateRequest(ML_CONNECTOR_INDEX, connectorId);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(connector.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS));
updateUndeployedConnector(connectorId, updateRequest, listener, context);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.action.controller;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX;
import static org.opensearch.ml.common.utils.StringUtils.getErrorMessage;

Expand Down Expand Up @@ -216,7 +217,7 @@ private void deleteControllerWithDeployedModel(String modelId, Boolean isHidden,
}

private void deleteController(String modelId, Boolean isHidden, ActionListener<DeleteResponse> actionListener) {
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId);
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId).setRefreshPolicy(IMMEDIATE);
client.delete(deleteRequest, new ActionListener<>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -186,6 +187,7 @@ private void updateModelGroup(

private void updateModelGroup(String modelGroupId, Map<String, Object> source, ActionListener<MLUpdateModelGroupResponse> listener) {
UpdateRequest updateModelGroupRequest = new UpdateRequest();
updateModelGroupRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateModelGroupRequest.index(ML_MODEL_GROUP_INDEX).id(modelGroupId).doc(source);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
ActionListener<MLUpdateModelGroupResponse> wrappedListener = ActionListener.runBefore(listener, () -> context.restore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.action.models;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.ml.common.CommonValue.ML_CONTROLLER_INDEX;
import static org.opensearch.ml.common.CommonValue.ML_MODEL_INDEX;
Expand Down Expand Up @@ -287,7 +288,7 @@ private void deleteModelChunksAndController(
* @param modelId model ID
*/
private void deleteController(String modelId, Boolean isHidden, ActionListener<Boolean> actionListener) {
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId);
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId).setRefreshPolicy(IMMEDIATE);
client.delete(deleteRequest, new ActionListener<>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.ml.action.tasks;

import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.ml.common.CommonValue.ML_TASK_INDEX;
import static org.opensearch.ml.utils.MLNodeUtils.createXContentParserFromRegistry;
Expand Down Expand Up @@ -68,7 +69,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
if (mlTaskState.equals(MLTaskState.RUNNING)) {
actionListener.onFailure(new Exception("Task cannot be deleted in running state. Try after sometime"));
} else {
DeleteRequest deleteRequest = new DeleteRequest(ML_TASK_INDEX, taskId);
DeleteRequest deleteRequest = new DeleteRequest(ML_TASK_INDEX, taskId).setRefreshPolicy(IMMEDIATE);
client.delete(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ private void bulkUpdateModelState(
updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(builder.build());
bulkUpdateRequest.add(updateRequest);
}
bulkUpdateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
log.info("Refresh model state: {}", newModelStates);
client.bulk(bulkUpdateRequest, ActionListener.wrap(br -> {
updateModelStateSemaphore.release();
Expand Down

0 comments on commit 4c04854

Please sign in to comment.