Skip to content

Commit

Permalink
fix race confition in index initialization and RestUpdateConnector UT (
Browse files Browse the repository at this point in the history
…#1852)

Signed-off-by: Xun Zhang <[email protected]>
  • Loading branch information
Zhangxunmt authored Jan 12, 2024
1 parent 94d2f51 commit 19c93b1
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.opensearch.OpenSearchWrapperException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -85,7 +87,6 @@ public void initMLAgentIndex(ActionListener<Boolean> listener) {
public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener) {
String indexName = index.getIndexName();
String mapping = index.getMapping();

try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<Boolean> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
if (!clusterService.state().metadata().hasIndex(indexName)) {
Expand All @@ -97,8 +98,14 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
internalListener.onResponse(false);
}
}, e -> {
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
if (e instanceof ResourceAlreadyExistsException
|| (e instanceof OpenSearchWrapperException && e.getCause() instanceof ResourceAlreadyExistsException)) {
log.info("Skip creating the Index:{} that is already created by another parallel request", indexName);
internalListener.onResponse(true);
} else {
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
}
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(INDEX_SETTINGS);
client.admin().indices().create(request, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -191,4 +192,21 @@ public void initMLAgentIndexNoIndex() {
verify(listener).onResponse(argumentCaptor.capture());
assertEquals(true, argumentCaptor.getValue());
}

@Test
public void initMLConnectorIndex_ResourceAlreadyExistsException_RaceCondition() {
ActionListener<Boolean> listener = mock(ActionListener.class);
when(metadata.hasIndex(anyString())).thenReturn(false);
doAnswer(invocation -> {
ActionListener<CreateIndexResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new ResourceAlreadyExistsException("index [.plugins-ml-connector] already exists"));
return null;
}).when(indicesAdminClient).create(any(), any());
ArgumentCaptor<Boolean> argumentCaptor = ArgumentCaptor.forClass(Boolean.class);
indicesHandler.initMLConnectorIndex(listener);

verify(indicesAdminClient).create(isA(CreateIndexRequest.class), any());
verify(listener).onResponse(argumentCaptor.capture());
assertEquals(true, argumentCaptor.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,55 +142,55 @@ public void testPrepareRequestFeatureDisabled() throws Exception {
}

private RestRequest getRestRequest() {
RestRequest.Method method = RestRequest.Method.POST;
RestRequest.Method method = RestRequest.Method.PUT;
final Map<String, Object> updateContent = Map.of("version", "2", "description", "This is test description");
String requestContent = new Gson().toJson(updateContent).toString();
Map<String, String> params = new HashMap<>();
params.put("connector_id", "test_connectorId");
RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withMethod(method)
.withPath("/_plugins/_ml/connectors/_update/{connector_id}")
.withPath("/_plugins/_ml/connectors/{connector_id}")
.withParams(params)
.withContent(new BytesArray(requestContent), XContentType.JSON)
.build();
return request;
}

private RestRequest getRestRequestWithNullValue() {
RestRequest.Method method = RestRequest.Method.POST;
RestRequest.Method method = RestRequest.Method.PUT;
String requestContent = "{\"version\":\"2\",\"description\":null}";
Map<String, String> params = new HashMap<>();
params.put("connector_id", "test_connectorId");
RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withMethod(method)
.withPath("/_plugins/_ml/connectors/_update/{connector_id}")
.withPath("/_plugins/_ml/connectors/{connector_id}")
.withParams(params)
.withContent(new BytesArray(requestContent), XContentType.JSON)
.build();
return request;
}

private RestRequest getRestRequestWithEmptyContent() {
RestRequest.Method method = RestRequest.Method.POST;
RestRequest.Method method = RestRequest.Method.PUT;
Map<String, String> params = new HashMap<>();
params.put("connector_id", "test_connectorId");
RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withMethod(method)
.withPath("/_plugins/_ml/connectors/_update/{connector_id}")
.withPath("/_plugins/_ml/connectors/{connector_id}")
.withParams(params)
.withContent(new BytesArray(""), XContentType.JSON)
.build();
return request;
}

private RestRequest getRestRequestWithNullConnectorId() {
RestRequest.Method method = RestRequest.Method.POST;
RestRequest.Method method = RestRequest.Method.PUT;
final Map<String, Object> updateContent = Map.of("version", "2", "description", "This is test description");
String requestContent = new Gson().toJson(updateContent).toString();
Map<String, String> params = new HashMap<>();
RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withMethod(method)
.withPath("/_plugins/_ml/connectors/_update/{connector_id}")
.withPath("/_plugins/_ml/connectors/{connector_id}")
.withParams(params)
.withContent(new BytesArray(requestContent), XContentType.JSON)
.build();
Expand Down

0 comments on commit 19c93b1

Please sign in to comment.