Skip to content

Commit

Permalink
add logic for Create QueryGroup API
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Jul 9, 2024
1 parent 0684342 commit 5e9db9f
Show file tree
Hide file tree
Showing 23 changed files with 1,461 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add batching supported processor base type AbstractBatchingProcessor ([#14554](https://github.com/opensearch-project/OpenSearch/pull/14554))
- Fix race condition while parsing derived fields from search definition ([14445](https://github.com/opensearch-project/OpenSearch/pull/14445))
- Add allowlist setting for ingest-common and search-pipeline-common processors ([#14439](https://github.com/opensearch-project/OpenSearch/issues/14439))
- [Workload Management] Add Create QueryGroup API Logic ([#14680](https://github.com/opensearch-project/OpenSearch/pull/14680))

### Dependencies
- Bump `org.gradle.test-retry` from 1.5.8 to 1.5.9 ([#13442](https://github.com/opensearch-project/OpenSearch/pull/13442))
Expand Down
18 changes: 18 additions & 0 deletions plugins/workload-management/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

opensearchplugin {
description 'OpenSearch Workload Management Plugin.'
classname 'org.opensearch.plugin.wlm.action.WorkloadManagementPlugin'
}

dependencies {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionType;

/**
* Transport action to create QueryGroup
*
* @opensearch.api
*/
public class CreateQueryGroupAction extends ActionType<CreateQueryGroupResponse> {

/**
* An instance of CreateQueryGroupAction
*/
public static final CreateQueryGroupAction INSTANCE = new CreateQueryGroupAction();

/**
* Name for CreateQueryGroupAction
*/
public static final String NAME = "cluster:admin/opensearch/query_group/wlm/_create";

/**
* Default constructor
*/
private CreateQueryGroupAction() {
super(NAME, CreateQueryGroupResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode;

import org.opensearch.common.UUIDs;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentParser;
import org.joda.time.Instant;
import org.opensearch.search.ResourceType;

import java.io.IOException;
import java.util.Map;
import java.util.HashMap;

/**
* A request for create QueryGroup
* User input schema:
* {
* "name": "analytics",
* "resiliency_mode": "enforced",
* "resourceLimits": {
* "cpu" : 0.4,
* "memory" : 0.2
* }
* }
*
* @opensearch.internal
*/
public class CreateQueryGroupRequest extends ActionRequest implements Writeable.Reader<CreateQueryGroupRequest> {
private String name;
private String _id;
private ResiliencyMode resiliencyMode;
private Map<String, Object> resourceLimits;
private long updatedAtInMillis;

/**
* Default constructor for CreateQueryGroupRequest
*/
public CreateQueryGroupRequest() {}

/**
* Constructor for CreateQueryGroupRequest
* @param queryGroup - A {@link QueryGroup} object
*/
public CreateQueryGroupRequest(QueryGroup queryGroup) {
this.name = queryGroup.getName();
this._id = queryGroup.get_id();
Map<ResourceType, Object> resourceTypesMap = queryGroup.getResourceLimits();
Map<String, Object> resourceLimits_ = new HashMap<>();
for (Map.Entry<ResourceType, Object> resource : resourceTypesMap.entrySet()) {
resourceLimits_.put(resource.getKey().getName(), resource.getValue());
}
this.resourceLimits = resourceLimits_;
this.resiliencyMode = queryGroup.getResiliencyMode();
this.updatedAtInMillis = queryGroup.getUpdatedAtInMillis();
}

/**
* Constructor for CreateQueryGroupRequest
* @param name - QueryGroup name for CreateQueryGroupRequest
* @param _id - QueryGroup _id for CreateQueryGroupRequest
* @param mode - QueryGroup mode for CreateQueryGroupRequest
* @param resourceLimits - QueryGroup resourceLimits for CreateQueryGroupRequest
* @param updatedAtInMillis - QueryGroup updated time in millis for CreateQueryGroupRequest
*/
public CreateQueryGroupRequest(
String name,
String _id,
ResiliencyMode mode,
Map<String, Object> resourceLimits,
long updatedAtInMillis
) {
this.name = name;
this._id = _id;
this.resourceLimits = resourceLimits;
this.resiliencyMode = mode;
this.updatedAtInMillis = updatedAtInMillis;
}

/**
* Constructor for CreateQueryGroupRequest
* @param in - A {@link StreamInput} object
*/
public CreateQueryGroupRequest(StreamInput in) throws IOException {
super(in);
name = in.readString();
_id = in.readString();
resiliencyMode = ResiliencyMode.fromName(in.readString());
resourceLimits = in.readMap();
updatedAtInMillis = in.readLong();
}

@Override
public CreateQueryGroupRequest read(StreamInput in) throws IOException {
return new CreateQueryGroupRequest(in);
}

/**
* Generate a CreateQueryGroupRequest from XContent
* @param parser - A {@link XContentParser} object
*/
public static CreateQueryGroupRequest fromXContent(XContentParser parser) throws IOException {

while (parser.currentToken() != XContentParser.Token.START_OBJECT) {
parser.nextToken();
}

if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("expected start object but got a " + parser.currentToken());
}

XContentParser.Token token;
String fieldName = "";
String name = null;
String _id = UUIDs.randomBase64UUID();
ResiliencyMode mode = null;
long updatedAtInMillis = Instant.now().getMillis();

// Map to hold resources
final Map<String, Object> resourceLimits = new HashMap<>();
while ((token = parser.nextToken()) != null) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token.isValue()) {
if (fieldName.equals("name")) {
name = parser.text();
} else if (fieldName.equals("resiliency_mode")) {
mode = ResiliencyMode.fromName(parser.text());
} else {
throw new IllegalArgumentException("unrecognised [field=" + fieldName + " in QueryGroup");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (!fieldName.equals("resourceLimits")) {
throw new IllegalArgumentException(
"QueryGroup.resourceLimits is an object and expected token was { " + " but found " + token
);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else {
resourceLimits.put(fieldName, parser.doubleValue());
}
}
}
}
return new CreateQueryGroupRequest(name, _id, mode, resourceLimits, updatedAtInMillis);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

/**
* name getter
*/
public String getName() {
return name;
}

/**
* name setter
* @param name - name to be set
*/
public void setName(String name) {
this.name = name;
}

/**
* resourceLimits getter
*/
public Map<String, Object> getResourceLimits() {
return resourceLimits;
}

/**
* resourceLimits setter
* @param resourceLimits - resourceLimit to be set
*/
public void setResourceLimits(Map<String, Object> resourceLimits) {
this.resourceLimits = resourceLimits;
}

/**
* mode getter
*/
public ResiliencyMode getResiliencyMode() {
return resiliencyMode;
}

/**
* mode setter
* @param resiliencyMode - mode to be set
*/
public void setResiliencyMode(ResiliencyMode resiliencyMode) {
this.resiliencyMode = resiliencyMode;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeString(_id);
out.writeString(resiliencyMode.getName());
out.writeMap(resourceLimits);
out.writeLong(updatedAtInMillis);
}

/**
* _id getter
*/
public String get_id() {
return _id;
}

/**
* UUID setter
* @param _id - _id to be set
*/
public void set_id(String _id) {
this._id = _id;
}

/**
* updatedAtInMillis getter
*/
public long getUpdatedAtInMillis() {
return updatedAtInMillis;
}

/**
* updatedAtInMillis setter
* @param updatedAtInMillis - updatedAtInMillis to be set
*/
public void setUpdatedAtInMillis(long updatedAtInMillis) {
this.updatedAtInMillis = updatedAtInMillis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.plugin.wlm.action;

import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Response for the create API for QueryGroup
*
* @opensearch.internal
*/
public class CreateQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject {
private final QueryGroup queryGroup;
private RestStatus restStatus;

/**
* Constructor for CreateQueryGroupResponse
* @param queryGroup - The QueryGroup to be included in the response
* @param restStatus - The restStatus for the response
*/
public CreateQueryGroupResponse(final QueryGroup queryGroup, RestStatus restStatus) {
this.queryGroup = queryGroup;
this.restStatus = restStatus;
}

/**
* Constructor for CreateQueryGroupResponse
* @param in - A {@link StreamInput} object
*/
public CreateQueryGroupResponse(StreamInput in) throws IOException {
queryGroup = new QueryGroup(in);
restStatus = RestStatus.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
queryGroup.writeTo(out);
RestStatus.writeTo(out, restStatus);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return QueryGroup.writeToXContent(queryGroup, builder);
}

/**
* queryGroup getter
*/
public QueryGroup getQueryGroup() {
return queryGroup;
}

/**
* restStatus getter
*/
public RestStatus getRestStatus() {
return restStatus;
}
}
Loading

0 comments on commit 5e9db9f

Please sign in to comment.