Skip to content

Commit

Permalink
Subject.runAs and introduce PluginSubject
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Aug 13, 2024
1 parent 3948acf commit e685c5c
Show file tree
Hide file tree
Showing 17 changed files with 124 additions and 279 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@ public void authenticate(AuthToken authenticationToken) {
.orElseThrow(() -> new UnsupportedAuthenticationToken());
shiroSubject.login(authToken);
}

@Override
public Session runAs() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static org.hamcrest.Matchers.equalTo;

/**
* Test to ensure that plugin execution context is set when plugin uses pluginNodeClient
* Test to ensure that plugin execution context is set when plugin runs transport action with PluginSubject
*/
@ClusterScope(scope = Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 1)
public class ExecutionContextPluginGetIT extends HttpSmokeTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.Supplier;

public class TestExecutionContextPlugin extends Plugin implements ActionPlugin {
private Client client;
private ThreadPool threadPool;

@Override
Expand All @@ -52,15 +53,16 @@ public Collection<Object> createComponents(
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver expressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
this.client = client;
this.threadPool = threadPool;
return Collections.emptyList();
return Collections.singletonList(pluginSubject);
}

@Override
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
return List.of(new TestGetExecutionContextRestAction(pluginNodeClient, threadPool));
return List.of(new TestGetExecutionContextRestAction(client));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

package org.opensearch.http.executioncontextplugin;

import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.client.node.PluginNodeClient;
import org.opensearch.plugins.PluginSubject;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
Expand All @@ -22,12 +23,10 @@

public class TestGetExecutionContextRestAction extends BaseRestHandler {

private final PluginNodeClient pluginNodeClient;
private final ThreadPool threadPool;
private final Client client;

public TestGetExecutionContextRestAction(PluginNodeClient pluginNodeClient, ThreadPool threadPool) {
this.pluginNodeClient = pluginNodeClient;
this.threadPool = threadPool;
public TestGetExecutionContextRestAction(Client client) {
this.client = client;
}

@Override
Expand All @@ -43,6 +42,6 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
final TestGetExecutionContextRequest getExecutionContextRequest = new TestGetExecutionContextRequest();
return channel -> pluginNodeClient.executeLocally(TestGetExecutionContextAction.INSTANCE, getExecutionContextRequest, new RestToXContentListener<>(channel));
return channel -> client.execute(TestGetExecutionContextAction.INSTANCE, getExecutionContextRequest, new RestToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.identity.Subject;
import org.opensearch.plugins.PluginSubject;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

Expand All @@ -23,16 +25,21 @@
*/
public class TestGetExecutionContextTransportAction extends HandledTransportAction<TestGetExecutionContextRequest, TestGetExecutionContextResponse> {
private final TransportService transportService;
private final PluginSubject pluginSubject;

@Inject
public TestGetExecutionContextTransportAction(TransportService transportService, ActionFilters actionFilters) {
public TestGetExecutionContextTransportAction(TransportService transportService, ActionFilters actionFilters, PluginSubject pluginSubject) {
super(TestGetExecutionContextAction.NAME, transportService, actionFilters, TestGetExecutionContextRequest::new);
this.transportService = transportService;
this.pluginSubject = pluginSubject;
}

@Override
protected void doExecute(Task task, TestGetExecutionContextRequest request, ActionListener<TestGetExecutionContextResponse> listener) {
String pluginClassName = transportService.getThreadPool().getThreadContext().getHeader(ThreadContext.PLUGIN_EXECUTION_CONTEXT);
String pluginClassName;
try (Subject.Session session = pluginSubject.runAs()) {
pluginClassName = transportService.getThreadPool().getThreadContext().getHeader(PluginSubject.PLUGIN_EXECUTION_CONTEXT);
}
listener.onResponse(new TestGetExecutionContextResponse(pluginClassName));
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ public final class ThreadContext implements Writeable {
*/
public static final String ACTION_ORIGIN_TRANSIENT_NAME = "action.origin";

public static final String PLUGIN_EXECUTION_CONTEXT = "_plugin_execution_context";

private static final Set<String> FORBIDDEN_HEADRES = Set.of(PLUGIN_EXECUTION_CONTEXT);

// thread context permissions

private static final Permission ACCESS_SYSTEM_THREAD_CONTEXT_PERMISSION = new ThreadContextPermission("markAsSystemContext");
Expand Down Expand Up @@ -189,44 +185,6 @@ public StoredContext stashContext() {
};
}

/**
* Removes the current context and resets a default context. Retains information about plugin stashing the context.
* The removed context can be restored by closing the returned {@link StoredContext}.
*/
StoredContext stashContext(Class<?> pluginClass) {
final ThreadContextStruct context = threadLocal.get();
/*
X-Opaque-ID should be preserved in a threadContext in order to propagate this across threads.
This is needed so the DeprecationLogger in another thread can see the value of X-Opaque-ID provided by a user.
Otherwise when context is stash, it should be empty.
*/

ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putPersistent(context.persistentHeaders);

if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
threadContextStruct = threadContextStruct.putHeaders(
MapBuilder.<String, String>newMapBuilder()
.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID))
.immutableMap()
);
}

final Map<String, Object> transientHeaders = propagateTransients(context.transientHeaders, context.isSystemContext);
if (!transientHeaders.isEmpty()) {
threadContextStruct = threadContextStruct.putTransient(transientHeaders);
}

threadContextStruct = threadContextStruct.putPluginExecutionContext(pluginClass);
threadLocal.set(threadContextStruct);

return () -> {
// If the node and thus the threadLocal get closed while this task
// is still executing, we don't want this runnable to fail with an
// uncaught exception
threadLocal.set(context);
};
}

/**
* Captures the current thread context as writeable, allowing it to be serialized out later
*/
Expand Down Expand Up @@ -507,9 +465,6 @@ public void copyHeaders(Iterable<Map.Entry<String, String>> headers) {
* Puts a header into the context
*/
public void putHeader(String key, String value) {
if (FORBIDDEN_HEADRES.contains(key)) {
throw new IllegalArgumentException("Cannot set forbidden header: " + key);
}
threadLocal.set(threadLocal.get().putRequest(key, value));
}

Expand Down Expand Up @@ -759,24 +714,13 @@ private ThreadContextStruct() {
this(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), false);
}

private ThreadContextStruct putPluginExecutionContext(Class<?> pluginClass) {
Map<String, String> newRequestHeaders = new HashMap<>(this.requestHeaders);
if (newRequestHeaders.putIfAbsent(PLUGIN_EXECUTION_CONTEXT, pluginClass.getCanonicalName()) != null) {
throw new IllegalArgumentException("value for key [" + PLUGIN_EXECUTION_CONTEXT + "] already present");
}
return new ThreadContextStruct(newRequestHeaders, responseHeaders, transientHeaders, persistentHeaders, isSystemContext);
}

private ThreadContextStruct putRequest(String key, String value) {
Map<String, String> newRequestHeaders = new HashMap<>(this.requestHeaders);
putSingleHeader(key, value, newRequestHeaders);
return new ThreadContextStruct(newRequestHeaders, responseHeaders, transientHeaders, persistentHeaders, isSystemContext);
}

private static <T> void putSingleHeader(String key, T value, Map<String, T> newHeaders) {
if (FORBIDDEN_HEADRES.contains(key)) {
throw new IllegalArgumentException("Cannot set forbidden header: " + key);
}
if (newHeaders.putIfAbsent(key, value) != null) {
throw new IllegalArgumentException("value for key [" + key + "] already present");
}
Expand Down
21 changes: 21 additions & 0 deletions server/src/main/java/org/opensearch/identity/Subject.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.identity;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.identity.tokens.AuthToken;

import java.security.Principal;
Expand All @@ -29,4 +30,24 @@ public interface Subject {
* throws SubjectDisabled
*/
void authenticate(final AuthToken token);

/**
* runAs allows the caller to create a session as this subject
*
* @return A session to run transport actions in the context of this subject
*/
Session runAs();

/**
* This construct represents a session for this subject. A session is a short-lived block
* where transport actions are executed as this subject
*
* @opensearch.api
*/
@FunctionalInterface
@PublicApi(since = "2.17.0")
interface Session extends AutoCloseable {
@Override
void close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,9 @@ public String toString() {
public void authenticate(AuthToken AuthToken) {
// Do nothing as noop subject is always logged in
}

@Override
public Session runAs() {
return null;
}
}
Loading

0 comments on commit e685c5c

Please sign in to comment.