From d849280e17c52bdbe884c6b9fd7339f1be187edd Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Mon, 8 Jul 2024 17:51:30 -0700 Subject: [PATCH] add logic for Create QueryGroup API --- plugins/workload-management/build.gradle | 18 ++ .../wlm/action/CreateQueryGroupAction.java | 36 +++ .../wlm/action/CreateQueryGroupRequest.java | 252 ++++++++++++++++++ .../wlm/action/CreateQueryGroupResponse.java | 74 +++++ .../TransportCreateQueryGroupAction.java | 74 +++++ .../wlm/action/WorkloadManagementPlugin.java | 62 +++++ .../WorkloadManagementPluginModule.java | 32 +++ .../plugin/wlm/action/package-info.java | 12 + .../rest/RestCreateQueryGroupAction.java | 80 ++++++ .../plugin/wlm/action/rest/package-info.java | 12 + .../wlm/action/service/Persistable.java | 25 ++ .../service/QueryGroupPersistenceService.java | 228 ++++++++++++++++ .../wlm/action/service/package-info.java | 12 + .../action/CreateQueryGroupRequestTests.java | 34 +++ .../action/CreateQueryGroupResponseTests.java | 38 +++ .../wlm/action/QueryGroupTestUtils.java | 132 +++++++++ .../QueryGroupPersistenceServiceTests.java | 104 ++++++++ .../opensearch/cluster/metadata/Metadata.java | 6 + .../cluster/metadata/QueryGroup.java | 19 +- .../common/settings/ClusterSettings.java | 8 +- .../QueryGroupServiceSettings.java | 198 ++++++++++++++ .../search/query_group/package-info.java | 12 + 22 files changed, 1460 insertions(+), 8 deletions(-) create mode 100644 plugins/workload-management/build.gradle create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponse.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportCreateQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestCreateQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java create mode 100644 server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java create mode 100644 server/src/main/java/org/opensearch/search/query_group/package-info.java diff --git a/plugins/workload-management/build.gradle b/plugins/workload-management/build.gradle new file mode 100644 index 0000000000000..89e13c079795e --- /dev/null +++ b/plugins/workload-management/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +opensearchplugin { + description 'OpenSearch Workload Management Plugin.' + classname 'org.opensearch.plugin.wlm.action.WorkloadManagementPlugin' +} + +dependencies { +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupAction.java new file mode 100644 index 0000000000000..003b8b60c4ff9 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupAction.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionType; + +/** + * Transport action to create QueryGroup + * + * @opensearch.api + */ +public class CreateQueryGroupAction extends ActionType { + + /** + * An instance of CreateQueryGroupAction + */ + public static final CreateQueryGroupAction INSTANCE = new CreateQueryGroupAction(); + + /** + * Name for CreateQueryGroupAction + */ + public static final String NAME = "cluster:admin/opensearch/query_group/wlm/_create"; + + /** + * Default constructor + */ + private CreateQueryGroupAction() { + super(NAME, CreateQueryGroupResponse::new); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java new file mode 100644 index 0000000000000..2ce70ed8ac931 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequest.java @@ -0,0 +1,252 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode; + +import org.opensearch.common.UUIDs; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.XContentParser; +import org.joda.time.Instant; +import org.opensearch.search.ResourceType; + +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; + +/** + * A request for create QueryGroup + * User input schema: + * { + * "name": "analytics", + * "resiliency_mode": "enforced", + * "resourceLimits": { + * "cpu" : 0.4, + * "memory" : 0.2 + * } + * } + * + * @opensearch.internal + */ +public class CreateQueryGroupRequest extends ActionRequest implements Writeable.Reader { + private String name; + private String _id; + private ResiliencyMode resiliencyMode; + private Map resourceLimits; + private long updatedAtInMillis; + + /** + * Default constructor for CreateQueryGroupRequest + */ + public CreateQueryGroupRequest() {} + + /** + * Constructor for CreateQueryGroupRequest + * @param queryGroup - A {@link QueryGroup} object + */ + public CreateQueryGroupRequest(QueryGroup queryGroup) { + this.name = queryGroup.getName(); + this._id = queryGroup.get_id(); + Map resourceTypesMap = queryGroup.getResourceLimits(); + Map resourceLimits_ = new HashMap<>(); + for (Map.Entry resource : resourceTypesMap.entrySet()) { + resourceLimits_.put(resource.getKey().getName(), resource.getValue()); + } + this.resourceLimits = resourceLimits_; + this.resiliencyMode = queryGroup.getResiliencyMode(); + this.updatedAtInMillis = queryGroup.getUpdatedAtInMillis(); + } + + /** + * Constructor for CreateQueryGroupRequest + * @param name - QueryGroup name for CreateQueryGroupRequest + * @param _id - QueryGroup _id for CreateQueryGroupRequest + * @param mode - QueryGroup mode for CreateQueryGroupRequest + * @param resourceLimits - QueryGroup resourceLimits for CreateQueryGroupRequest + * @param updatedAtInMillis - QueryGroup updated time in millis for CreateQueryGroupRequest + */ + public CreateQueryGroupRequest( + String name, + String _id, + ResiliencyMode mode, + Map resourceLimits, + long updatedAtInMillis + ) { + this.name = name; + this._id = _id; + this.resourceLimits = resourceLimits; + this.resiliencyMode = mode; + this.updatedAtInMillis = updatedAtInMillis; + } + + /** + * Constructor for CreateQueryGroupRequest + * @param in - A {@link StreamInput} object + */ + public CreateQueryGroupRequest(StreamInput in) throws IOException { + super(in); + name = in.readString(); + _id = in.readString(); + resiliencyMode = ResiliencyMode.fromName(in.readString()); + resourceLimits = in.readMap(); + updatedAtInMillis = in.readLong(); + } + + @Override + public CreateQueryGroupRequest read(StreamInput in) throws IOException { + return new CreateQueryGroupRequest(in); + } + + /** + * Generate a CreateQueryGroupRequest from XContent + * @param parser - A {@link XContentParser} object + */ + public static CreateQueryGroupRequest fromXContent(XContentParser parser) throws IOException { + + while (parser.currentToken() != XContentParser.Token.START_OBJECT) { + parser.nextToken(); + } + + if (parser.currentToken() != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("expected start object but got a " + parser.currentToken()); + } + + XContentParser.Token token; + String fieldName = ""; + String name = null; + String _id = UUIDs.randomBase64UUID(); + ResiliencyMode mode = null; + long updatedAtInMillis = Instant.now().getMillis(); + + // Map to hold resources + final Map resourceLimits = new HashMap<>(); + while ((token = parser.nextToken()) != null) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token.isValue()) { + if (fieldName.equals("name")) { + name = parser.text(); + } else if (fieldName.equals("resiliency_mode")) { + mode = ResiliencyMode.fromName(parser.text()); + } else { + throw new IllegalArgumentException("unrecognised [field=" + fieldName + " in QueryGroup"); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (!fieldName.equals("resourceLimits")) { + throw new IllegalArgumentException( + "QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token + ); + } + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else { + resourceLimits.put(fieldName, parser.doubleValue()); + } + } + } + } + return new CreateQueryGroupRequest(name, _id, mode, resourceLimits, updatedAtInMillis); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + /** + * name getter + */ + public String getName() { + return name; + } + + /** + * name setter + * @param name - name to be set + */ + public void setName(String name) { + this.name = name; + } + + /** + * resourceLimits getter + */ + public Map getResourceLimits() { + return resourceLimits; + } + + /** + * resourceLimits setter + * @param resourceLimits - resourceLimit to be set + */ + public void setResourceLimits(Map resourceLimits) { + this.resourceLimits = resourceLimits; + } + + /** + * mode getter + */ + public ResiliencyMode getResiliencyMode() { + return resiliencyMode; + } + + /** + * mode setter + * @param resiliencyMode - mode to be set + */ + public void setResiliencyMode(ResiliencyMode resiliencyMode) { + this.resiliencyMode = resiliencyMode; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeString(_id); + out.writeString(resiliencyMode.getName()); + out.writeMap(resourceLimits); + out.writeLong(updatedAtInMillis); + } + + /** + * _id getter + */ + public String get_id() { + return _id; + } + + /** + * UUID setter + * @param _id - _id to be set + */ + public void set_id(String _id) { + this._id = _id; + } + + /** + * updatedAtInMillis getter + */ + public long getUpdatedAtInMillis() { + return updatedAtInMillis; + } + + /** + * updatedAtInMillis setter + * @param updatedAtInMillis - updatedAtInMillis to be set + */ + public void setUpdatedAtInMillis(long updatedAtInMillis) { + this.updatedAtInMillis = updatedAtInMillis; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponse.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponse.java new file mode 100644 index 0000000000000..4a446698495ff --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponse.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * Response for the create API for QueryGroup + * + * @opensearch.internal + */ +public class CreateQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject { + private final QueryGroup queryGroup; + private RestStatus restStatus; + + /** + * Constructor for CreateQueryGroupResponse + * @param queryGroup - The QueryGroup to be included in the response + * @param restStatus - The restStatus for the response + */ + public CreateQueryGroupResponse(final QueryGroup queryGroup, RestStatus restStatus) { + this.queryGroup = queryGroup; + this.restStatus = restStatus; + } + + /** + * Constructor for CreateQueryGroupResponse + * @param in - A {@link StreamInput} object + */ + public CreateQueryGroupResponse(StreamInput in) throws IOException { + queryGroup = new QueryGroup(in); + restStatus = RestStatus.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + queryGroup.writeTo(out); + RestStatus.writeTo(out, restStatus); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return QueryGroup.writeToXContent(queryGroup, builder); + } + + /** + * queryGroup getter + */ + public QueryGroup getQueryGroup() { + return queryGroup; + } + + /** + * restStatus getter + */ + public RestStatus getRestStatus() { + return restStatus; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportCreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportCreateQueryGroupAction.java new file mode 100644 index 0000000000000..245cd7e04c1fd --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportCreateQueryGroupAction.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.action.service.Persistable; +import org.opensearch.search.ResourceType; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.cluster.metadata.QueryGroup.builder; + +/** + * Transport action to create QueryGroup + * + * @opensearch.internal + */ +public class TransportCreateQueryGroupAction extends HandledTransportAction { + + private final ThreadPool threadPool; + private final Persistable queryGroupPersistenceService; + + /** + * Constructor for TransportCreateQueryGroupAction + * + * @param actionName - action name + * @param transportService - a {@link TransportService} object + * @param actionFilters - a {@link ActionFilters} object + * @param threadPool - a {@link ThreadPool} object + * @param queryGroupPersistenceService - a {@link Persistable} object + */ + @Inject + public TransportCreateQueryGroupAction( + String actionName, + TransportService transportService, + ActionFilters actionFilters, + ThreadPool threadPool, + Persistable queryGroupPersistenceService + ) { + super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new); + this.threadPool = threadPool; + this.queryGroupPersistenceService = queryGroupPersistenceService; + } + + @Override + protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListener listener) { + Map resourceTypesMap = new HashMap<>(); + Map resourceLimitsStringMap = request.getResourceLimits(); + for (Map.Entry resource : resourceLimitsStringMap.entrySet()) { + resourceTypesMap.put(ResourceType.fromName(resource.getKey()), resource.getValue()); + } + QueryGroup queryGroup = builder().name(request.getName()) + ._id(request.get_id()) + .mode(request.getResiliencyMode().getName()) + .resourceLimits(resourceTypesMap) + .updatedAt(request.getUpdatedAtInMillis()) + .build(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.persist(queryGroup, listener)); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java new file mode 100644 index 0000000000000..c568d3e6bc2ef --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.inject.Module; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.plugin.wlm.action.rest.RestCreateQueryGroupAction; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * Plugin class for WorkloadManagement + */ +public class WorkloadManagementPlugin extends Plugin implements ActionPlugin { + + /** + * Default constructor + */ + public WorkloadManagementPlugin() {} + + @Override + public List> getActions() { + return List.of(new ActionPlugin.ActionHandler<>(CreateQueryGroupAction.INSTANCE, TransportCreateQueryGroupAction.class)); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(new RestCreateQueryGroupAction()); + } + + @Override + public Collection createGuiceModules() { + return List.of(new WorkloadManagementPluginModule()); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java new file mode 100644 index 0000000000000..65f92a59a576b --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.inject.AbstractModule; +import org.opensearch.common.inject.TypeLiteral; +import org.opensearch.plugin.wlm.action.service.Persistable; +import org.opensearch.plugin.wlm.action.service.QueryGroupPersistenceService; + +/** + * Guice Module to manage WorkloadManagement related objects + */ +public class WorkloadManagementPluginModule extends AbstractModule { + + /** + * Constructor for WorkloadManagementPluginModule + */ + public WorkloadManagementPluginModule() {} + + @Override + protected void configure() { + bind(new TypeLiteral>() { + }).to(QueryGroupPersistenceService.class).asEagerSingleton(); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java new file mode 100644 index 0000000000000..8f7d2647546f5 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of CRUD API of QueryGroup + */ +package org.opensearch.plugin.wlm.action; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestCreateQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestCreateQueryGroupAction.java new file mode 100644 index 0000000000000..669e928f0391b --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestCreateQueryGroupAction.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.rest; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.plugin.wlm.action.CreateQueryGroupAction; +import org.opensearch.plugin.wlm.action.CreateQueryGroupRequest; +import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.POST; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * Rest action to create a QueryGroup + * + * @opensearch.api + */ +public class RestCreateQueryGroupAction extends BaseRestHandler { + + /** + * Constructor for RestCreateQueryGroupAction + */ + public RestCreateQueryGroupAction() {} + + @Override + public String getName() { + return "create_query_group"; + } + + /** + * The list of {@link Route}s that this RestHandler is responsible for handling. + */ + @Override + public List routes() { + return List.of(new Route(POST, "_query_group/"), new Route(PUT, "_query_group/")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + CreateQueryGroupRequest createQueryGroupRequest = new CreateQueryGroupRequest(); + request.applyContentParser((parser) -> parseRestRequest(createQueryGroupRequest, parser)); + return channel -> client.execute(CreateQueryGroupAction.INSTANCE, createQueryGroupRequest, createQueryGroupResponse(channel)); + } + + private void parseRestRequest(CreateQueryGroupRequest request, XContentParser parser) throws IOException { + final CreateQueryGroupRequest createQueryGroupRequest = CreateQueryGroupRequest.fromXContent(parser); + request.setName(createQueryGroupRequest.getName()); + request.set_id(createQueryGroupRequest.get_id()); + request.setResiliencyMode(createQueryGroupRequest.getResiliencyMode()); + request.setResourceLimits(createQueryGroupRequest.getResourceLimits()); + request.setUpdatedAtInMillis(createQueryGroupRequest.getUpdatedAtInMillis()); + } + + private RestResponseListener createQueryGroupResponse(final RestChannel channel) { + return new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final CreateQueryGroupResponse response) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java new file mode 100644 index 0000000000000..783826608c517 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for the rest classes for QueryGroup CRUD operations + */ +package org.opensearch.plugin.wlm.action.rest; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java new file mode 100644 index 0000000000000..4515893ad5458 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; + +/** + * This interface defines the key APIs for implementing QueruGroup persistence + */ +public interface Persistable { + + /** + * persists the QueryGroup in a durable storage + * @param queryGroup + * @param listener + */ + void persist(T queryGroup, ActionListener listener); +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java new file mode 100644 index 0000000000000..fb1862d045b7b --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java @@ -0,0 +1,228 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler.ThrottlingKey; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.plugin.wlm.action.CreateQueryGroupResponse; +import org.opensearch.search.ResourceType; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.DoubleAdder; + +import static org.opensearch.search.query_group.QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT; +import static org.opensearch.search.query_group.QueryGroupServiceSettings.QUERY_GROUP_COUNT_SETTING_NAME; + +/** + * This class defines the functions for QueryGroup persistence + */ +public class QueryGroupPersistenceService implements Persistable { + private static final Logger logger = LogManager.getLogger(QueryGroupPersistenceService.class); + private final ClusterService clusterService; + private static final String SOURCE = "query-group-persistence-service"; + private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group"; + private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group"; + private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group"; + private final AtomicInteger inflightCreateQueryGroupRequestCount; + private final Map inflightResourceLimitValues; + private volatile int maxQueryGroupCount; + final ThrottlingKey createQueryGroupThrottlingKey; + final ThrottlingKey updateQueryGroupThrottlingKey; + final ThrottlingKey deleteQueryGroupThrottlingKey; + + /** + * Constructor for QueryGroupPersistenceService + * + * @param clusterService {@link ClusterService} - The cluster service to be used by QueryGroupPersistenceService + * @param settings {@link Settings} - The settings to be used by QueryGroupPersistenceService + * @param clusterSettings {@link ClusterSettings} - The cluster settings to be used by QueryGroupPersistenceService + */ + @Inject + public QueryGroupPersistenceService( + final ClusterService clusterService, + final Settings settings, + final ClusterSettings clusterSettings + ) { + this.clusterService = clusterService; + this.createQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_THROTTLING_KEY, true); + this.deleteQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_THROTTLING_KEY, true); + this.updateQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true); + maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); + inflightCreateQueryGroupRequestCount = new AtomicInteger(); + inflightResourceLimitValues = new HashMap<>(); + } + + /** + * Set maxQueryGroupCount to be newMaxQueryGroupCount + * @param newMaxQueryGroupCount - the max number of QueryGroup allowed + */ + public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { + if (newMaxQueryGroupCount < 0) { + throw new IllegalArgumentException("node.query_group.max_count can't be negative"); + } + this.maxQueryGroupCount = newMaxQueryGroupCount; + } + + @Override + public void persist(QueryGroup queryGroup, ActionListener listener) { + persistInClusterStateMetadata(queryGroup, listener); + } + + /** + * Update cluster state to include the new QueryGroup + * @param queryGroup {@link QueryGroup} - the QueryGroup we're currently creating + */ + void persistInClusterStateMetadata(QueryGroup queryGroup, ActionListener listener) { + clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return saveQueryGroupInClusterState(queryGroup, currentState); + } + + @Override + public ThrottlingKey getClusterManagerThrottlingKey() { + return createQueryGroupThrottlingKey; + } + + @Override + public void onFailure(String source, Exception e) { + restoreInflightValues(queryGroup.getResourceLimits()); + inflightCreateQueryGroupRequestCount.decrementAndGet(); + logger.warn("failed to save QueryGroup object due to error: {}, for source: {}", e.getMessage(), source); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + restoreInflightValues(queryGroup.getResourceLimits()); + inflightCreateQueryGroupRequestCount.decrementAndGet(); + CreateQueryGroupResponse response = new CreateQueryGroupResponse(queryGroup, RestStatus.OK); + listener.onResponse(response); + } + }); + } + + /** + * Get the allocation value for resourceName for the QueryGroup + * @param resourceName - the resourceName we want to get the usage for + * @param resourceLimits - the resource limit from which to get the allocation value for resourceName + */ + private double getResourceLimitValue(String resourceName, final Map resourceLimits) { + for (ResourceType resourceType: resourceLimits.keySet()) { + if (resourceType.getName().equals(resourceName)) { + return (double) resourceLimits.get(resourceType); + } + } + return 0.0; + } + + /** + * This method will be executed before we submit the new cluster state + * @param queryGroup - the QueryGroup we're currently creating + * @param currentClusterState - the cluster state before the update + */ + ClusterState saveQueryGroupInClusterState(final QueryGroup queryGroup, final ClusterState currentClusterState) { + final Metadata metadata = currentClusterState.metadata(); + String groupName = queryGroup.getName(); + final Map previousGroups = metadata.queryGroups(); + + // check if there's any resource allocation that exceed limit of 1.0 + String resourceNameWithThresholdExceeded = ""; + for (ResourceType resourceType : queryGroup.getResourceLimits().keySet()) { + String resourceName = resourceType.getName(); + double existingUsage = calculateExistingUsage(resourceName, previousGroups, groupName); + double newGroupUsage = getResourceLimitValue(resourceName, queryGroup.getResourceLimits()); + inflightResourceLimitValues.computeIfAbsent(resourceName, k -> new DoubleAdder()).add(newGroupUsage); + double totalUsage = existingUsage + inflightResourceLimitValues.get(resourceName).doubleValue(); + if (totalUsage > 1) { + resourceNameWithThresholdExceeded = resourceName; + } + } + // check if group count exceed max + boolean groupCountExceeded = inflightCreateQueryGroupRequestCount.incrementAndGet() + previousGroups.size() > maxQueryGroupCount; + + if (previousGroups.containsKey(groupName)) { + logger.warn("QueryGroup with name {} already exists. Not creating a new one.", groupName); + throw new RuntimeException("QueryGroup with name " + groupName + " already exists. Not creating a new one."); + } + if (!resourceNameWithThresholdExceeded.isEmpty()) { + logger.error("Total resource allocation for {} will go above the max limit of 1.0", resourceNameWithThresholdExceeded); + throw new RuntimeException( + "Total resource allocation for " + resourceNameWithThresholdExceeded + " will go above the max limit of 1.0" + ); + } + if (groupCountExceeded) { + logger.error("{} value exceeded its assigned limit of {}", QUERY_GROUP_COUNT_SETTING_NAME, maxQueryGroupCount); + throw new RuntimeException("Can't create more than " + maxQueryGroupCount + " QueryGroups in the system"); + } + Metadata newData = Metadata.builder(metadata).put(queryGroup).build(); + + return ClusterState.builder(currentClusterState).metadata(newData).build(); + } + + /** + * This method restores the inflight values to be before the QueryGroup is processed + * @param resourceLimits - the resourceLimits we're currently restoring + */ + void restoreInflightValues(Map resourceLimits) { + if (resourceLimits == null || resourceLimits.isEmpty()) { + return; + } + for (ResourceType resourceType : resourceLimits.keySet()) { + String currResourceName = resourceType.getName(); + inflightResourceLimitValues.get(currResourceName).add(-getResourceLimitValue(currResourceName, resourceLimits)); + } + } + + /** + * This method calculates the existing total usage of the resource (except the group that we're updating here) + * @param resourceName - the resource name we're calculating + * @param groupsMap - existing QueryGroups + * @param groupName - the QueryGroup name we're updating + */ + private double calculateExistingUsage(String resourceName, Map groupsMap, String groupName) { + double existingUsage = 0; + for (String currGroupId : groupsMap.keySet()) { + QueryGroup currGroup = groupsMap.get(currGroupId); + if (!currGroup.getName().equals(groupName)) { + existingUsage += getResourceLimitValue(resourceName, currGroup.getResourceLimits()); + } + } + return existingUsage; + } + + /** + * inflightCreateQueryGroupRequestCount getter + */ + public AtomicInteger getInflightCreateQueryGroupRequestCount() { + return inflightCreateQueryGroupRequestCount; + } + + /** + * inflightResourceLimitValues getter + */ + public Map getInflightResourceLimitValues() { + return inflightResourceLimitValues; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java new file mode 100644 index 0000000000000..e70ac3afb81b5 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for the service classes for QueryGroup CRUD operations + */ +package org.opensearch.plugin.wlm.action.service; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java new file mode 100644 index 0000000000000..5cd0443fb1387 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupRequestTests.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.compareResourceLimits; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupOne; + +public class CreateQueryGroupRequestTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + CreateQueryGroupRequest request = new CreateQueryGroupRequest(queryGroupOne); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + CreateQueryGroupRequest otherRequest = new CreateQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + assertEquals(request.getResourceLimits().size(), otherRequest.getResourceLimits().size()); + assertEquals(request.getResiliencyMode(), otherRequest.getResiliencyMode()); + compareResourceLimits(request.getResourceLimits(), otherRequest.getResourceLimits()); + assertEquals(request.getUpdatedAtInMillis(), otherRequest.getUpdatedAtInMillis()); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java new file mode 100644 index 0000000000000..3074f7480e94c --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/CreateQueryGroupResponseTests.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class CreateQueryGroupResponseTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + CreateQueryGroupResponse response = new CreateQueryGroupResponse(QueryGroupTestUtils.queryGroupOne, RestStatus.OK); + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + CreateQueryGroupResponse otherResponse = new CreateQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + QueryGroup responseGroup = response.getQueryGroup(); + QueryGroup otherResponseGroup = otherResponse.getQueryGroup(); + List listOne = new ArrayList<>(); + List listTwo = new ArrayList<>(); + listOne.add(responseGroup); + listTwo.add(otherResponseGroup); + QueryGroupTestUtils.compareQueryGroups(listOne, listTwo); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java new file mode 100644 index 0000000000000..fce686d257af4 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.wlm.action.service.QueryGroupPersistenceService; +import org.opensearch.search.ResourceType; +import org.opensearch.threadpool.ThreadPool; + +import java.util.*; +import java.util.concurrent.atomic.DoubleAdder; + +import static org.opensearch.cluster.metadata.QueryGroup.builder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.opensearch.search.ResourceType.fromName; + +public class QueryGroupTestUtils { + public static final String NAME_ONE = "query_group_one"; + public static final String NAME_TWO = "query_group_two"; + public static final String _ID_ONE = "AgfUO5Ja9yfsYlONlYi3TQ=="; + public static final String _ID_TWO = "G5iIqHy4g7eK1qIAAAAIH53=1"; + public static final String NAME_NONE_EXISTED = "query_group_none_existed"; + public static final String MEMORY_STRING = "memory"; + public static final String MONITOR_STRING = "monitor"; + public static final long TIMESTAMP_ONE = 4513232413L; + public static final long TIMESTAMP_TWO = 4513232415L; + public static final QueryGroup queryGroupOne = builder().name(NAME_ONE) + ._id(_ID_ONE) + .mode(MONITOR_STRING) + .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.3)) + .updatedAt(TIMESTAMP_ONE) + .build(); + + public static final QueryGroup queryGroupTwo = builder().name(NAME_TWO) + ._id(_ID_TWO) + .mode(MONITOR_STRING) + .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.6)) + .updatedAt(TIMESTAMP_TWO) + .build(); + + public static final Map queryGroupMap = Map.of(NAME_ONE, queryGroupOne, NAME_TWO, queryGroupTwo); + + public static List queryGroupList() { + List list = new ArrayList<>(); + list.add(queryGroupOne); + list.add(queryGroupTwo); + return list; + } + + public static ClusterState clusterState() { + final Metadata metadata = Metadata.builder().queryGroups(Map.of(NAME_ONE, queryGroupOne, NAME_TWO, queryGroupTwo)).build(); + return ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + } + + public static Settings settings() { + return Settings.builder().build(); + } + + public static ClusterSettings clusterSettings() { + return new ClusterSettings(settings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + } + + public static QueryGroupPersistenceService queryGroupPersistenceService() { + ClusterService clusterService = new ClusterService(settings(), clusterSettings(), mock(ThreadPool.class)); + return new QueryGroupPersistenceService(clusterService, settings(), clusterSettings()); + } + + public static List prepareSandboxPersistenceService(Map queryGroups) { + Metadata metadata = Metadata.builder().queryGroups(queryGroups).build(); + Settings settings = Settings.builder().build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, mock(ThreadPool.class)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + QueryGroupPersistenceService queryGroupPersistenceService = new QueryGroupPersistenceService( + clusterService, + settings, + clusterSettings + ); + return List.of(queryGroupPersistenceService, clusterState); + } + + public static void compareResourceTypes(Map resourceLimitMapOne, Map resourceLimitMapTwo) { + assertTrue(resourceLimitMapOne.keySet().containsAll(resourceLimitMapTwo.keySet())); + assertTrue(resourceLimitMapOne.values().containsAll(resourceLimitMapTwo.values())); +// for (Map.Entry entryOne : resourceLimitMapOne.entrySet()) { +// String resourceName = entryOne.getKey().getName(); +// Optional> entryTwo = resourceLimitMapTwo.entrySet().stream() +// .filter(e -> e.getKey().getName().equals(resourceName)) +// .findFirst(); +// assertTrue(entryTwo.isPresent()); +// assertEquals(entryOne.getValue(), entryTwo.get().getValue()); +// } + } + + public static void compareResourceLimits(Map resourceLimitMapOne, Map resourceLimitMapTwo) { + assertTrue(resourceLimitMapOne.keySet().containsAll(resourceLimitMapTwo.keySet())); + assertTrue(resourceLimitMapOne.values().containsAll(resourceLimitMapTwo.values())); + } + + public static void compareQueryGroups(List listOne, List listTwo) { + assertEquals(listOne.size(), listTwo.size()); + listOne.sort(Comparator.comparing(QueryGroup::getName)); + listTwo.sort(Comparator.comparing(QueryGroup::getName)); + for (int i = 0; i < listOne.size(); i++) { + assertTrue(listOne.get(i).equals(listTwo.get(i))); + } + } + + public static void assertInflightValuesAreZero(QueryGroupPersistenceService queryGroupPersistenceService) { + assertEquals(0, queryGroupPersistenceService.getInflightCreateQueryGroupRequestCount().get()); + Map inflightResourceMap = queryGroupPersistenceService.getInflightResourceLimitValues(); + if (inflightResourceMap != null) { + for (String resourceName : inflightResourceMap.keySet()) { + assertEquals(0, inflightResourceMap.get(resourceName).intValue()); + } + } + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java new file mode 100644 index 0000000000000..c37da96b7ebad --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.search.ResourceType; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; + +import java.util.*; + +import static org.opensearch.cluster.metadata.QueryGroup.builder; +import static org.mockito.Mockito.mock; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.*; +import static org.opensearch.search.query_group.QueryGroupServiceSettings.QUERY_GROUP_COUNT_SETTING_NAME; + +public class QueryGroupPersistenceServiceTests extends OpenSearchTestCase { + + public void testCreateQueryGroup() { + List setup = prepareSandboxPersistenceService(new HashMap<>()); + QueryGroupPersistenceService queryGroupPersistenceService1 = (QueryGroupPersistenceService) setup.get(0); + ClusterState clusterState = (ClusterState) setup.get(1); + ClusterState newClusterState = queryGroupPersistenceService1.saveQueryGroupInClusterState(queryGroupOne, clusterState); + Map updatedGroupsMap = newClusterState.getMetadata().queryGroups(); + assertEquals(1, updatedGroupsMap.size()); + assertTrue(updatedGroupsMap.containsKey(_ID_ONE)); + List listOne = new ArrayList<>(); + List listTwo = new ArrayList<>(); + listOne.add(queryGroupOne); + listTwo.add(updatedGroupsMap.get(_ID_ONE)); + compareQueryGroups(listOne, listTwo); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testCreateAnotherQueryGroup() { + List setup = prepareSandboxPersistenceService(Map.of(NAME_ONE, queryGroupOne)); + QueryGroupPersistenceService queryGroupPersistenceService1 = (QueryGroupPersistenceService) setup.get(0); + ClusterState clusterState = (ClusterState) setup.get(1); + ClusterState newClusterState = queryGroupPersistenceService1.saveQueryGroupInClusterState(queryGroupTwo, clusterState); + Map updatedGroups = newClusterState.getMetadata().queryGroups(); + assertEquals(2, updatedGroups.size()); + Collection values = updatedGroups.values(); + compareQueryGroups(queryGroupList(), new ArrayList<>(values)); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testCreateQueryGroupDuplicateName() { + List setup = prepareSandboxPersistenceService(Map.of(NAME_ONE, queryGroupOne)); + QueryGroupPersistenceService queryGroupPersistenceService1 = (QueryGroupPersistenceService) setup.get(0); + ClusterState clusterState = (ClusterState) setup.get(1); + QueryGroup toCreate = builder().name(NAME_ONE) + ._id("W5iIqHyhgi4K1qIAAAAIHw==") + .mode(MONITOR_STRING) + .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.3)) + .updatedAt(1690934400000L) + .build(); + assertThrows(RuntimeException.class, () -> queryGroupPersistenceService1.saveQueryGroupInClusterState(toCreate, clusterState)); + } + + public void testCreateQueryGroupOverflowAllocation() { + List setup = prepareSandboxPersistenceService(Map.of(NAME_TWO, queryGroupOne)); + QueryGroup toCreate = builder().name(NAME_TWO) + ._id("W5iIqHyhgi4K1qIAAAAIHw==") + .mode(MONITOR_STRING) + .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.5)) + .updatedAt(1690934400000L) + .build(); + QueryGroupPersistenceService queryGroupPersistenceService1 = (QueryGroupPersistenceService) setup.get(0); + ClusterState clusterState = (ClusterState) setup.get(1); + assertThrows(RuntimeException.class, () -> queryGroupPersistenceService1.saveQueryGroupInClusterState(toCreate, clusterState)); + } + + public void testCreateQueryGroupOverflowCount() { + QueryGroup toCreate = builder().name(NAME_NONE_EXISTED) + ._id("W5iIqHyhgi4K1qIAAAAIHw==") + .mode(MONITOR_STRING) + .resourceLimits(Map.of(ResourceType.fromName(MEMORY_STRING), 0.5)) + .updatedAt(1690934400000L) + .build(); + Metadata metadata = Metadata.builder().queryGroups(Map.of(NAME_ONE, queryGroupOne, NAME_TWO, queryGroupTwo)).build(); + Settings settings = Settings.builder().put(QUERY_GROUP_COUNT_SETTING_NAME, 2).build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + ClusterService clusterService = new ClusterService(settings, clusterSettings, mock(ThreadPool.class)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + QueryGroupPersistenceService queryGroupPersistenceService1 = new QueryGroupPersistenceService( + clusterService, + settings, + clusterSettings + ); + assertThrows(RuntimeException.class, () -> queryGroupPersistenceService1.saveQueryGroupInClusterState(toCreate, clusterState)); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 2a54f6444ffda..d1589bce0cdc8 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -844,6 +844,12 @@ public Map views() { return Optional.ofNullable((ViewMetadata) this.custom(ViewMetadata.TYPE)).map(ViewMetadata::views).orElse(Collections.emptyMap()); } + public Map queryGroups() { + return Optional.ofNullable((QueryGroupMetadata) this.custom(QueryGroupMetadata.TYPE)) + .map(QueryGroupMetadata::queryGroups) + .orElse(Collections.emptyMap()); + } + public DecommissionAttributeMetadata decommissionAttributeMetadata() { return custom(DecommissionAttributeMetadata.TYPE); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java index beaab198073df..8677e28d6b257 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java @@ -30,7 +30,7 @@ * { * "_id": "fafjafjkaf9ag8a9ga9g7ag0aagaga", * "resourceLimits": { - * "jvm": 0.4 + * "memory": 0.4 * }, * "resiliency_mode": "enforced", * "name": "analytics", @@ -112,21 +112,26 @@ private void validateResourceLimits(Map resourceLimits) { Objects.requireNonNull(resource.getKey(), "resourceName can't be null"); Objects.requireNonNull(threshold, "resource limit threshold for" + resource.getKey().getName() + " : can't be null"); - if (Double.compare(threshold, 1.0) > 0) { - throw new IllegalArgumentException("resource value should be less than 1.0"); + if (Double.compare(threshold, 0.0) <= 0 || Double.compare(threshold, 1.0) > 0) { + throw new IllegalArgumentException("resource value should be greater than 0 and less or equal to 1.0"); } } } @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + return writeToXContent(this, builder); + } + + public static XContentBuilder writeToXContent(QueryGroup queryGroup, final XContentBuilder builder) throws IOException { builder.startObject(); - builder.field("_id", _id); - builder.field("name", name); - builder.field("resiliency_mode", resiliencyMode.getName()); - builder.field("updatedAt", updatedAtInMillis); + builder.field("_id", queryGroup.get_id()); + builder.field("name", queryGroup.getName()); + builder.field("resiliency_mode", queryGroup.getResiliencyMode().getName()); + builder.field("updatedAt", queryGroup.getUpdatedAtInMillis()); // write resource limits builder.startObject("resourceLimits"); + Map resourceLimits = queryGroup.getResourceLimits(); for (ResourceType resourceType : ResourceType.values()) { if (resourceLimits.containsKey(resourceType)) { builder.field(resourceType.getName(), resourceLimits.get(resourceType)); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5dcf23ae52294..a40e33e632d58 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -157,6 +157,7 @@ import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.search.backpressure.settings.SearchTaskSettings; +import org.opensearch.search.query_group.QueryGroupServiceSettings; import org.opensearch.search.fetch.subphase.highlight.FastVectorHighlighter; import org.opensearch.snapshots.InternalSnapshotsInfoService; import org.opensearch.snapshots.SnapshotsService; @@ -758,7 +759,12 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, // Composite index settings - CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING + CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING, + + // QueryGroup settings + QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT, + QueryGroupServiceSettings.NODE_LEVEL_REJECTION_THRESHOLD, + QueryGroupServiceSettings.NODE_LEVEL_CANCELLATION_THRESHOLD ) ) ); diff --git a/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java b/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java new file mode 100644 index 0000000000000..bfde3e1daf426 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java @@ -0,0 +1,198 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query_group; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +/** + * Main class to declare the QueryGroup feature related settings + */ +public class QueryGroupServiceSettings { + private static final Long DEFAULT_RUN_INTERVAL_MILLIS = 1000l; + private static final Double DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD = 0.8; + private static final Double DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD = 0.9; + /** + * default max queryGroup count on any node at any given point in time + */ + public static final int DEFAULT_MAX_QUERY_GROUP_COUNT_VALUE = 100; + + public static final String QUERY_GROUP_COUNT_SETTING_NAME = "node.query_group.max_count"; + public static final double NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95; + public static final double NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE = 0.90; + + private TimeValue runIntervalMillis; + private Double nodeLevelJvmCancellationThreshold; + private Double nodeLevelJvmRejectionThreshold; + private volatile int maxQueryGroupCount; + /** + * max QueryGroup count setting + */ + public static final Setting MAX_QUERY_GROUP_COUNT = Setting.intSetting( + QUERY_GROUP_COUNT_SETTING_NAME, + DEFAULT_MAX_QUERY_GROUP_COUNT_VALUE, + 0, + (newVal) -> { + if (newVal > 100 || newVal < 1) throw new IllegalArgumentException( + QUERY_GROUP_COUNT_SETTING_NAME + " should be in range [1-100]" + ); + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for default QueryGroup count + */ + public static final String SERVICE_RUN_INTERVAL_MILLIS_SETTING_NAME = "query_group.service.run_interval_millis"; + /** + * Setting to control the run interval of QSB service + */ + private static final Setting QUERY_GROUP_RUN_INTERVAL_SETTING = Setting.longSetting( + SERVICE_RUN_INTERVAL_MILLIS_SETTING_NAME, + DEFAULT_RUN_INTERVAL_MILLIS, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting name for node level rejection threshold for QSB + */ + public static final String NODE_REJECTION_THRESHOLD_SETTING_NAME = "query_group.node.rejection_threshold"; + /** + * Setting to control the rejection threshold + */ + public static final Setting NODE_LEVEL_REJECTION_THRESHOLD = Setting.doubleSetting( + NODE_REJECTION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for node level cancellation threshold + */ + public static final String NODE_CANCELLATION_THRESHOLD_SETTING_NAME = "query_group.node.cancellation_threshold"; + /** + * Setting name for node level cancellation threshold + */ + public static final Setting NODE_LEVEL_CANCELLATION_THRESHOLD = Setting.doubleSetting( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * QueryGroup service settings constructor + * @param settings + * @param clusterSettings + */ + public QueryGroupServiceSettings(Settings settings, ClusterSettings clusterSettings) { + runIntervalMillis = new TimeValue(QUERY_GROUP_RUN_INTERVAL_SETTING.get(settings)); + nodeLevelJvmCancellationThreshold = NODE_LEVEL_CANCELLATION_THRESHOLD.get(settings); + nodeLevelJvmRejectionThreshold = NODE_LEVEL_REJECTION_THRESHOLD.get(settings); + maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelJvmRejectionThreshold, nodeLevelJvmCancellationThreshold); + + clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CANCELLATION_THRESHOLD, this::setNodeLevelJvmCancellationThreshold); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_REJECTION_THRESHOLD, this::setNodeLevelJvmRejectionThreshold); + } + + /** + * Method to get runInterval for QSB + * @return runInterval in milliseconds for QSB Service + */ + public TimeValue getRunIntervalMillis() { + return runIntervalMillis; + } + + /** + * Method to set the new QueryGroup count + * @param newMaxQueryGroupCount is the new maxQueryGroupCount per node + */ + public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { + if (newMaxQueryGroupCount < 0) { + throw new IllegalArgumentException("node.node.query_group.max_count can't be negative"); + } + this.maxQueryGroupCount = newMaxQueryGroupCount; + } + + /** + * Method to get the node level cancellation threshold + * @return current node level cancellation threshold + */ + public Double getNodeLevelJvmCancellationThreshold() { + return nodeLevelJvmCancellationThreshold; + } + + /** + * Method to set the node level cancellation threshold + * @param nodeLevelJvmCancellationThreshold sets the new node level cancellation threshold + * @throws IllegalArgumentException if the value is > 0.95 and cancellation < rejection threshold + */ + public void setNodeLevelJvmCancellationThreshold(Double nodeLevelJvmCancellationThreshold) { + if (Double.compare(nodeLevelJvmCancellationThreshold, NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be greater than 0.95 as it pose a threat of node drop" + ); + } + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelJvmRejectionThreshold, nodeLevelJvmCancellationThreshold); + + this.nodeLevelJvmCancellationThreshold = nodeLevelJvmCancellationThreshold; + } + + /** + * Method to get the node level rejection threshold + * @return the current node level rejection threshold + */ + public Double getNodeLevelJvmRejectionThreshold() { + return nodeLevelJvmRejectionThreshold; + } + + /** + * Method to set the node level rejection threshold + * @param nodeLevelJvmRejectionThreshold sets the new rejection threshold + * @throws IllegalArgumentException if rejection > 0.90 and rejection < cancellation threshold + */ + public void setNodeLevelJvmRejectionThreshold(Double nodeLevelJvmRejectionThreshold) { + if (Double.compare(nodeLevelJvmRejectionThreshold, NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_REJECTION_THRESHOLD_SETTING_NAME + " value not be greater than 0.90 as it pose a threat of node drop" + ); + } + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelJvmRejectionThreshold, nodeLevelJvmCancellationThreshold); + + this.nodeLevelJvmRejectionThreshold = nodeLevelJvmRejectionThreshold; + } + + private void ensureRejectionThresholdIsLessThanCancellation( + Double nodeLevelJvmRejectionThreshold, + Double nodeLevelJvmCancellationThreshold + ) { + if (Double.compare(nodeLevelJvmCancellationThreshold, nodeLevelJvmRejectionThreshold) < 0) { + throw new IllegalArgumentException( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be less than " + NODE_REJECTION_THRESHOLD_SETTING_NAME + ); + } + } + + /** + * Method to get the current QueryGroup count + * @return the current max QueryGroup count + */ + public int getMaxQueryGroupCount() { + return maxQueryGroupCount; + } +} diff --git a/server/src/main/java/org/opensearch/search/query_group/package-info.java b/server/src/main/java/org/opensearch/search/query_group/package-info.java new file mode 100644 index 0000000000000..e7b8443799e83 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query_group/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Query Sandboxing related artifacts + */ +package org.opensearch.search.query_group;