Skip to content

Commit

Permalink
Cleanup SystemIndexTests with RunAsSubjectClient (opensearch-project#…
Browse files Browse the repository at this point in the history
…5027)

Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks authored Jan 16, 2025
1 parent 1924e41 commit 5e5339d
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testPluginShouldBeAbleToIndexDocumentIntoItsSystemIndex() {
HttpResponse response = client.put("try-create-and-index/" + SYSTEM_INDEX_1);

assertThat(response.getStatusCode(), equalTo(RestStatus.OK.getStatus()));
assertThat(response.getBody(), containsString(SystemIndexPlugin1.class.getCanonicalName()));
assertThat(response.getBody(), containsString("{\"acknowledged\":true}"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

package org.opensearch.security.systemindex.sampleplugin;

// CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here
import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;
// CS-ENFORCE-SINGLE

public class IndexDocumentIntoSystemIndexAction extends ActionType<IndexDocumentIntoSystemIndexResponse> {
public class IndexDocumentIntoSystemIndexAction extends ActionType<AcknowledgedResponse> {
public static final IndexDocumentIntoSystemIndexAction INSTANCE = new IndexDocumentIntoSystemIndexAction();
public static final String NAME = "cluster:mock/systemindex/index";

private IndexDocumentIntoSystemIndexAction() {
super(NAME, IndexDocumentIntoSystemIndexResponse::new);
super(NAME, AcknowledgedResponse::new);
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
public class RestBulkIndexDocumentIntoMixOfSystemIndexAction extends BaseRestHandler {

private final Client client;
private final PluginContextSwitcher contextSwitcher;
private final RunAsSubjectClient pluginClient;

public RestBulkIndexDocumentIntoMixOfSystemIndexAction(Client client, PluginContextSwitcher contextSwitcher) {
public RestBulkIndexDocumentIntoMixOfSystemIndexAction(Client client, RunAsSubjectClient pluginClient) {
this.client = client;
this.contextSwitcher = contextSwitcher;
this.pluginClient = pluginClient;
}

@Override
Expand All @@ -57,19 +57,14 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client

@Override
public void accept(RestChannel channel) throws Exception {
contextSwitcher.runAs(() -> {
BulkRequestBuilder builder = client.prepareBulk();
builder.add(new IndexRequest(SYSTEM_INDEX_1).source("content", 1));
builder.add(new IndexRequest(SYSTEM_INDEX_2).source("content", 1));
builder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkRequest bulkRequest = builder.request();
client.bulk(bulkRequest, ActionListener.wrap(r -> {
channel.sendResponse(
new BytesRestResponse(RestStatus.OK, r.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
);
}, fr -> { channel.sendResponse(new BytesRestResponse(RestStatus.FORBIDDEN, String.valueOf(fr))); }));
return null;
});
BulkRequestBuilder builder = client.prepareBulk();
builder.add(new IndexRequest(SYSTEM_INDEX_1).source("content", 1));
builder.add(new IndexRequest(SYSTEM_INDEX_2).source("content", 1));
builder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkRequest bulkRequest = builder.request();
pluginClient.bulk(bulkRequest, ActionListener.wrap(r -> {
channel.sendResponse(new BytesRestResponse(RestStatus.OK, r.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)));
}, fr -> { channel.sendResponse(new BytesRestResponse(RestStatus.FORBIDDEN, String.valueOf(fr))); }));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
public class RestBulkIndexDocumentIntoSystemIndexAction extends BaseRestHandler {

private final Client client;
private final PluginContextSwitcher contextSwitcher;
private final RunAsSubjectClient pluginClient;

public RestBulkIndexDocumentIntoSystemIndexAction(Client client, PluginContextSwitcher contextSwitcher) {
public RestBulkIndexDocumentIntoSystemIndexAction(Client client, RunAsSubjectClient pluginClient) {
this.client = client;
this.contextSwitcher = contextSwitcher;
this.pluginClient = pluginClient;
}

@Override
Expand All @@ -58,21 +58,18 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client

@Override
public void accept(RestChannel channel) throws Exception {
contextSwitcher.runAs(() -> {
client.admin().indices().create(new CreateIndexRequest(indexName), ActionListener.wrap(r -> {
BulkRequestBuilder builder = client.prepareBulk();
builder.add(new IndexRequest(indexName).source("{\"content\":1}", XContentType.JSON));
builder.add(new IndexRequest(indexName).source("{\"content\":2}", XContentType.JSON));
builder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkRequest bulkRequest = builder.request();
client.bulk(bulkRequest, ActionListener.wrap(r2 -> {
channel.sendResponse(
new BytesRestResponse(RestStatus.OK, r.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
);
}, fr -> { channel.sendResponse(new BytesRestResponse(RestStatus.FORBIDDEN, String.valueOf(fr))); }));
pluginClient.admin().indices().create(new CreateIndexRequest(indexName), ActionListener.wrap(r -> {
BulkRequestBuilder builder = client.prepareBulk();
builder.add(new IndexRequest(indexName).source("{\"content\":1}", XContentType.JSON));
builder.add(new IndexRequest(indexName).source("{\"content\":2}", XContentType.JSON));
builder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
BulkRequest bulkRequest = builder.request();
pluginClient.bulk(bulkRequest, ActionListener.wrap(r2 -> {
channel.sendResponse(
new BytesRestResponse(RestStatus.OK, r.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
);
}, fr -> { channel.sendResponse(new BytesRestResponse(RestStatus.FORBIDDEN, String.valueOf(fr))); }));
return null;
});
}, fr -> { channel.sendResponse(new BytesRestResponse(RestStatus.FORBIDDEN, String.valueOf(fr))); }));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
public class RestRunClusterHealthAction extends BaseRestHandler {

private final Client client;
private final PluginContextSwitcher contextSwitcher;

public RestRunClusterHealthAction(Client client, PluginContextSwitcher contextSwitcher) {
public RestRunClusterHealthAction(Client client) {
this.client = client;
this.contextSwitcher = contextSwitcher;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.security.systemindex.sampleplugin;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionType;
import org.opensearch.client.Client;
import org.opensearch.client.FilterClient;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.identity.Subject;

/**
* Implementation of client that will run transport actions in a stashed context and inject the name of the provided
* subject into the context.
*/
public class RunAsSubjectClient extends FilterClient {

private static final Logger logger = LogManager.getLogger(RunAsSubjectClient.class);

private Subject subject;

public RunAsSubjectClient(Client delegate) {
super(delegate);
}

public RunAsSubjectClient(Client delegate, Subject subject) {
super(delegate);
this.subject = subject;
}

public void setSubject(Subject subject) {
this.subject = subject;
}

@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
try (ThreadContext.StoredContext ctx = threadPool().getThreadContext().newStoredContext(false)) {
subject.runAs(() -> {
logger.info("Running transport action with subject: {}", subject.getPrincipal().getName());
super.doExecute(action, request, ActionListener.runBefore(listener, ctx::restore));
return null;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

package org.opensearch.security.systemindex.sampleplugin;

// CS-SUPPRESS-SINGLE: RegexpSingleline It is not possible to use phrase "cluster manager" instead of master here
import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;
// CS-ENFORCE-SINGLE

public class RunClusterHealthAction extends ActionType<RunClusterHealthResponse> {
public class RunClusterHealthAction extends ActionType<AcknowledgedResponse> {
public static final RunClusterHealthAction INSTANCE = new RunClusterHealthAction();
public static final String NAME = "cluster:mock/monitor/health";

private RunClusterHealthAction() {
super(NAME, RunClusterHealthResponse::new);
super(NAME, AcknowledgedResponse::new);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
public class SystemIndexPlugin1 extends Plugin implements SystemIndexPlugin, IdentityAwarePlugin {
public static final String SYSTEM_INDEX_1 = ".system-index1";

private PluginContextSwitcher contextSwitcher;
private RunAsSubjectClient pluginClient;

private Client client;

Expand All @@ -64,8 +64,8 @@ public Collection<Object> createComponents(
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.client = client;
this.contextSwitcher = new PluginContextSwitcher();
return List.of(contextSwitcher);
this.pluginClient = new RunAsSubjectClient(client);
return List.of(pluginClient);
}

@Override
Expand All @@ -86,9 +86,9 @@ public List<RestHandler> getRestHandlers(
) {
return List.of(
new RestIndexDocumentIntoSystemIndexAction(client),
new RestRunClusterHealthAction(client, contextSwitcher),
new RestBulkIndexDocumentIntoSystemIndexAction(client, contextSwitcher),
new RestBulkIndexDocumentIntoMixOfSystemIndexAction(client, contextSwitcher)
new RestRunClusterHealthAction(client),
new RestBulkIndexDocumentIntoSystemIndexAction(client, pluginClient),
new RestBulkIndexDocumentIntoMixOfSystemIndexAction(client, pluginClient)
);
}

Expand All @@ -102,8 +102,8 @@ public List<RestHandler> getRestHandlers(

@Override
public void assignSubject(PluginSubject pluginSystemSubject) {
if (contextSwitcher != null) {
this.contextSwitcher.initialize(pluginSystemSubject);
if (pluginClient != null) {
this.pluginClient.setSubject(pluginSystemSubject);
}
}
}
Loading

0 comments on commit 5e5339d

Please sign in to comment.