From 1a550fe8df46e1c11ad0381339daafdffb0afc78 Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Wed, 23 Oct 2024 16:46:06 -0700 Subject: [PATCH] address comments Signed-off-by: Ruirui Zhang --- .../opensearch/wlm/WorkloadManagementIT.java | 59 +++++++++++++++---- 1 file changed, 46 insertions(+), 13 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java index 060af2a89853d..de79ca79b51fc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/wlm/WorkloadManagementIT.java @@ -16,8 +16,13 @@ import org.opensearch.action.search.SearchTask; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.cluster.service.ClusterService; @@ -58,14 +63,15 @@ import static org.opensearch.wlm.QueryGroupTask.QUERY_GROUP_ID_HEADER; import static org.hamcrest.Matchers.instanceOf; -@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0) +import static org.opensearch.threadpool.ThreadPool.Names.SAME; + public class WorkloadManagementIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { final static String PUT = "PUT"; final static String MEMORY = "MEMORY"; final static String CPU = "CPU"; final static String ENABLED = "enabled"; final static String DELETE = "DELETE"; - private static final TimeValue TIMEOUT = new TimeValue(10, TimeUnit.SECONDS); + private static final TimeValue TIMEOUT = new TimeValue(1, TimeUnit.SECONDS); public WorkloadManagementIT(Settings nodeSettings) { super(nodeSettings); @@ -207,7 +213,7 @@ public void updateQueryGroupInClusterState(String method, QueryGroup queryGroup) assertEquals(0, listener.getLatch().getCount()); } - public static class TestClusterUpdateRequest extends ActionRequest { + public static class TestClusterUpdateRequest extends ClusterManagerNodeRequest { final private String method; final private QueryGroup queryGroup; @@ -243,22 +249,49 @@ public String getMethod() { } } - public static class TestClusterUpdateTransportAction extends HandledTransportAction { + public static class TestClusterUpdateTransportAction extends TransportClusterManagerNodeAction { public static final ActionType ACTION = new ActionType<>("internal::test_cluster_update_action", TestResponse::new); - private final ClusterService clusterService; @Inject public TestClusterUpdateTransportAction( + ThreadPool threadPool, TransportService transportService, - ClusterService clusterService, - ActionFilters actionFilters + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + ClusterService clusterService ) { - super(ACTION.name(), transportService, actionFilters, TestClusterUpdateRequest::new); - this.clusterService = clusterService; + super( + ACTION.name(), + transportService, + clusterService, + threadPool, + actionFilters, + TestClusterUpdateRequest::new, + indexNameExpressionResolver + ); } @Override - protected void doExecute(Task task, TestClusterUpdateRequest request, ActionListener listener) { + protected String executor() { + return SAME; + } + + @Override + protected TestResponse read(StreamInput in) throws IOException { + return new TestResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(TestClusterUpdateRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void clusterManagerOperation( + TestClusterUpdateRequest request, + ClusterState clusterState, + ActionListener listener + ) { clusterService.submitStateUpdateTask("query-group-persistence-service", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -350,7 +383,6 @@ protected void doExecute(Task task, TestQueryGroupTaskRequest request, ActionLis ((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext()); assertEquals(request.getQueryGroupId(), ((QueryGroupTask) task).getQueryGroupId()); long startTime = System.nanoTime(); - while (System.nanoTime() - startTime < TIMEOUT.getNanos()) { doWork(request); if (cancellableTask.isCancelled()) { @@ -379,8 +411,9 @@ private void doWork(TestQueryGroupTaskRequest request) throws InterruptedExcepti } while (i < iterations); break; case "MEMORY": - Byte[] bytes = new Byte[100000]; - int[] ints = new int[1000000]; + int bytesToAllocate = (int) (Runtime.getRuntime().totalMemory() * 0.01); + Byte[] bytes = new Byte[bytesToAllocate]; + int[] ints = new int[bytesToAllocate]; break; } }