Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Filter shards for sliced search at coordinator #17025

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
- Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707))
- Sliced search only fans out to shards matched by the selected slice, reducing open search contexts ([#16771](https://github.com/opensearch-project/OpenSearch/pull/16771))
- Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909))
- Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881))
- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
"default":"open",
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
}
},
"body":{
"description":"The search source (in order to specify slice parameters)"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
"Search shards with slice specified in body":
- skip:
version: " - 2.18.99"
reason: "Added slice body to search_shards in 2.19"
- do:
indices.create:
index: test_index
body:
settings:
index:
number_of_shards: 7
number_of_replicas: 0

- do:
search_shards:
index: test_index
body:
slice:
id: 0
max: 3
- length: { shards: 3 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 0 }
- match: { shards.1.0.shard: 3 }
- match: { shards.2.0.shard: 6 }

- do:
search_shards:
index: test_index
body:
slice:
id: 1
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 1 }
- match: { shards.1.0.shard: 4 }

- do:
search_shards:
index: test_index
body:
slice:
id: 2
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 2 }
- match: { shards.1.0.shard: 5 }


- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 0
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 0 }
- match: { shards.1.0.shard: 6 }

- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 1
max: 3
- length: { shards: 1 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 2 }

- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 2
max: 3
- length: { shards: 1 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 4 }
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.support.IndicesOptions;
Expand All @@ -41,6 +42,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.slice.SliceBuilder;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -61,6 +63,8 @@
@Nullable
private String preference;
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
@Nullable
private SliceBuilder sliceBuilder;

public ClusterSearchShardsRequest() {}

Expand All @@ -76,6 +80,12 @@
preference = in.readOptionalString();

indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
boolean hasSlice = in.readBoolean();
if (hasSlice) {
sliceBuilder = new SliceBuilder(in);

Check warning on line 86 in server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java#L86

Added line #L86 was not covered by tests
}
}
}

@Override
Expand All @@ -84,8 +94,15 @@
out.writeStringArray(indices);
out.writeOptionalString(routing);
out.writeOptionalString(preference);

indicesOptions.writeIndicesOptions(out);
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
if (sliceBuilder != null) {
out.writeBoolean(true);
sliceBuilder.writeTo(out);

Check warning on line 101 in server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/shards/ClusterSearchShardsRequest.java#L100-L101

Added lines #L100 - L101 were not covered by tests
} else {
out.writeBoolean(false);
}
}
}

@Override
Expand Down Expand Up @@ -166,4 +183,13 @@
public String preference() {
return this.preference;
}

public ClusterSearchShardsRequest slice(SliceBuilder sliceBuilder) {
this.sliceBuilder = sliceBuilder;
return this;
}

public SliceBuilder slice() {
return this.sliceBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected void clusterManagerOperation(

Set<String> nodeIds = new HashSet<>();
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
.searchShards(clusterState, concreteIndices, routingMap, request.preference(), null, null, request.slice());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
int currentGroup = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<
throw blockException;
}

shardsIt = clusterService.operationRouting()
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null);
shardsIt = clusterService.operationRouting().searchShards(clusterService.state(), new String[] { request.index() }, null, null);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
Expand Down Expand Up @@ -551,6 +552,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
collectSearchShards(
searchRequest.indicesOptions(),
searchRequest.preference(),
Expand All @@ -559,6 +561,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
remoteClusterIndices,
remoteClusterService,
threadPool,
slice,
ActionListener.wrap(searchShardsResponses -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
Expand Down Expand Up @@ -787,6 +790,7 @@ static void collectSearchShards(
Map<String, OriginalIndices> remoteIndicesByCluster,
RemoteClusterService remoteClusterService,
ThreadPool threadPool,
SliceBuilder slice,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener
) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
Expand All @@ -800,7 +804,8 @@ static void collectSearchShards(
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices).indicesOptions(indicesOptions)
.local(true)
.preference(preference)
.routing(routing);
.routing(routing)
.slice(slice);
clusterClient.admin()
.cluster()
.searchShards(
Expand Down Expand Up @@ -1042,14 +1047,16 @@ private void executeSearch(
concreteLocalIndices[i] = indices[i].getName();
}
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
GroupShardsIterator<ShardIterator> localShardRoutings = clusterService.operationRouting()
.searchShards(
clusterState,
concreteLocalIndices,
routingMap,
searchRequest.preference(),
searchService.getResponseCollectorService(),
nodeSearchCounts
nodeSearchCounts,
slice
);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing;

import org.apache.lucene.util.CollectionUtil;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
Expand All @@ -44,14 +45,17 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.search.slice.SliceBuilder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -230,7 +234,7 @@
@Nullable Map<String, Set<String>> routing,
@Nullable String preference
) {
return searchShards(clusterState, concreteIndices, routing, preference, null, null);
return searchShards(clusterState, concreteIndices, routing, preference, null, null, null);
}

public GroupShardsIterator<ShardIterator> searchShards(
reta marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -240,10 +244,24 @@
@Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts
) {
return searchShards(clusterState, concreteIndices, routing, preference, collectorService, nodeCounts, null);

Check warning on line 248 in server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java#L248

Added line #L248 was not covered by tests
}

public GroupShardsIterator<ShardIterator> searchShards(
ClusterState clusterState,
String[] concreteIndices,
@Nullable Map<String, Set<String>> routing,
@Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts,
@Nullable SliceBuilder slice
) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());

Map<Index, List<ShardIterator>> shardIterators = new HashMap<>();
for (IndexShardRoutingTable shard : shards) {

IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) {
preference = Preference.PRIMARY.type();
Expand Down Expand Up @@ -274,10 +292,31 @@
clusterState.metadata().weightedRoutingMetadata()
);
if (iterator != null) {
set.add(iterator);
shardIterators.computeIfAbsent(iterator.shardId().getIndex(), k -> new ArrayList<>()).add(iterator);
}
}
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
List<ShardIterator> allShardIterators = new ArrayList<>();
if (slice != null) {
for (List<ShardIterator> indexIterators : shardIterators.values()) {
// Filter the returned shards for the given slice
CollectionUtil.timSort(indexIterators);
// We use the ordinal of the iterator in the group (after sorting) rather than the shard id, because
// computeTargetedShards may return a subset of shards for an index, if a routing parameter was
// specified. In that case, the set of routable shards is considered the full universe of available
// shards for each index, when mapping shards to slices. If no routing parameter was specified,
// then ordinals and shard IDs are the same. This mimics the logic in
// org.opensearch.search.slice.SliceBuilder.toFilter.
for (int i = 0; i < indexIterators.size(); i++) {
if (slice.shardMatches(i, indexIterators.size())) {
allShardIterators.add(indexIterators.get(i));
}
}
}
} else {
shardIterators.values().forEach(allShardIterators::addAll);
}

return GroupShardsIterator.sortAndCreate(allShardIterators);
}

public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) {
Expand Down Expand Up @@ -311,6 +350,7 @@
set.add(indexShard);
}
}

}
return set;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -81,6 +82,13 @@
clusterSearchShardsRequest.routing(request.param("routing"));
clusterSearchShardsRequest.preference(request.param("preference"));
clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions()));
if (request.hasContentOrSourceParam()) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.parseXContent(request.contentOrSourceParamParser());

Check warning on line 87 in server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java#L86-L87

Added lines #L86 - L87 were not covered by tests
if (sourceBuilder.slice() != null) {
clusterSearchShardsRequest.slice(sourceBuilder.slice());

Check warning on line 89 in server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterSearchShardsAction.java#L89

Added line #L89 was not covered by tests
}
}
return channel -> client.admin().cluster().searchShards(clusterSearchShardsRequest, new RestToXContentListener<>(channel));
}
}
Loading
Loading