Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix race confition in index initialization and RestUpdateConnector UT #1852

Merged
merged 1 commit into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.opensearch.OpenSearchWrapperException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -85,7 +87,6 @@ public void initMLAgentIndex(ActionListener<Boolean> listener) {
public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener) {
String indexName = index.getIndexName();
String mapping = index.getMapping();

try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<Boolean> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
if (!clusterService.state().metadata().hasIndex(indexName)) {
Expand All @@ -97,8 +98,14 @@ public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener)
internalListener.onResponse(false);
}
}, e -> {
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
if (e instanceof ResourceAlreadyExistsException
|| (e instanceof OpenSearchWrapperException && e.getCause() instanceof ResourceAlreadyExistsException)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a better one is to use org.opensearch.ExceptionsHelper.unwrap

log.info("Skip creating the Index:{} that is already created by another parallel request", indexName);
internalListener.onResponse(true);
} else {
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
}
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(INDEX_SETTINGS);
client.admin().indices().create(request, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand Down Expand Up @@ -191,4 +192,21 @@ public void initMLAgentIndexNoIndex() {
verify(listener).onResponse(argumentCaptor.capture());
assertEquals(true, argumentCaptor.getValue());
}

@Test
public void initMLConnectorIndex_ResourceAlreadyExistsException_RaceCondition() {
ActionListener<Boolean> listener = mock(ActionListener.class);
when(metadata.hasIndex(anyString())).thenReturn(false);
doAnswer(invocation -> {
ActionListener<CreateIndexResponse> actionListener = invocation.getArgument(1);
actionListener.onFailure(new ResourceAlreadyExistsException("index [.plugins-ml-connector] already exists"));
return null;
}).when(indicesAdminClient).create(any(), any());
ArgumentCaptor<Boolean> argumentCaptor = ArgumentCaptor.forClass(Boolean.class);
indicesHandler.initMLConnectorIndex(listener);

verify(indicesAdminClient).create(isA(CreateIndexRequest.class), any());
verify(listener).onResponse(argumentCaptor.capture());
assertEquals(true, argumentCaptor.getValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,55 +142,55 @@ public void testPrepareRequestFeatureDisabled() throws Exception {
}

private RestRequest getRestRequest() {
RestRequest.Method method = RestRequest.Method.POST;
RestRequest.Method method = RestRequest.Method.PUT;
final Map<String, Object> updateContent = Map.of("version", "2", "description", "This is test description");
String requestContent = new Gson().toJson(updateContent).toString();
Map<String, String> params = new HashMap<>();
params.put("connector_id", "test_connectorId");
RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withMethod(method)
.withPath("/_plugins/_ml/connectors/_update/{connector_id}")
.withPath("/_plugins/_ml/connectors/{connector_id}")
.withParams(params)
.withContent(new BytesArray(requestContent), XContentType.JSON)
.build();
return request;
}

private RestRequest getRestRequestWithNullValue() {
RestRequest.Method method = RestRequest.Method.POST;
RestRequest.Method method = RestRequest.Method.PUT;
String requestContent = "{\"version\":\"2\",\"description\":null}";
Map<String, String> params = new HashMap<>();
params.put("connector_id", "test_connectorId");
RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withMethod(method)
.withPath("/_plugins/_ml/connectors/_update/{connector_id}")
.withPath("/_plugins/_ml/connectors/{connector_id}")
.withParams(params)
.withContent(new BytesArray(requestContent), XContentType.JSON)
.build();
return request;
}

private RestRequest getRestRequestWithEmptyContent() {
RestRequest.Method method = RestRequest.Method.POST;
RestRequest.Method method = RestRequest.Method.PUT;
Map<String, String> params = new HashMap<>();
params.put("connector_id", "test_connectorId");
RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withMethod(method)
.withPath("/_plugins/_ml/connectors/_update/{connector_id}")
.withPath("/_plugins/_ml/connectors/{connector_id}")
.withParams(params)
.withContent(new BytesArray(""), XContentType.JSON)
.build();
return request;
}

private RestRequest getRestRequestWithNullConnectorId() {
RestRequest.Method method = RestRequest.Method.POST;
RestRequest.Method method = RestRequest.Method.PUT;
final Map<String, Object> updateContent = Map.of("version", "2", "description", "This is test description");
String requestContent = new Gson().toJson(updateContent).toString();
Map<String, String> params = new HashMap<>();
RestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY)
.withMethod(method)
.withPath("/_plugins/_ml/connectors/_update/{connector_id}")
.withPath("/_plugins/_ml/connectors/{connector_id}")
.withParams(params)
.withContent(new BytesArray(requestContent), XContentType.JSON)
.build();
Expand Down
Loading