Skip to content

Commit

Permalink
[Backport 2.x] Filter shards for sliced search at coordinator (#17025)
Browse files Browse the repository at this point in the history
* Filter shards for sliced search at coordinator (#16771)

* Filter shards for sliced search at coordinator

Prior to this commit, a sliced search would fan out to every shard,
then apply a MatchNoDocsQuery filter on shards that don't correspond
to the current slice. This still creates a (useless) search context
on each shard for every slice, though. For a long-running sliced
scroll, this can quickly exhaust the number of available scroll
contexts.

This change avoids fanning out to all the shards by checking at the
coordinator if a shard is matched by the current slice. This should
reduce the number of open scroll contexts to max(numShards, numSlices)
instead of numShards * numSlices.

---------

Signed-off-by: Michael Froh <[email protected]>
(cherry picked from commit f9c239d)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Fix versions and breaking API changes

Signed-off-by: Andriy Redko <[email protected]>

---------

Signed-off-by: Michael Froh <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: Andriy Redko <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Andriy Redko <[email protected]>
  • Loading branch information
3 people authored Jan 15, 2025
1 parent bf06726 commit 81ee44a
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 45 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Indexed IP field supports `terms_query` with more than 1025 IP masks [#16391](https://github.com/opensearch-project/OpenSearch/pull/16391)
- Make entries for dependencies from server/build.gradle to gradle version catalog ([#16707](https://github.com/opensearch-project/OpenSearch/pull/16707))
- Sliced search only fans out to shards matched by the selected slice, reducing open search contexts ([#16771](https://github.com/opensearch-project/OpenSearch/pull/16771))
- Allow extended plugins to be optional ([#16909](https://github.com/opensearch-project/OpenSearch/pull/16909))
- Use the correct type to widen the sort fields when merging top docs ([#16881](https://github.com/opensearch-project/OpenSearch/pull/16881))
- Limit reader writer separation to remote store enabled clusters [#16760](https://github.com/opensearch-project/OpenSearch/pull/16760)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
"default":"open",
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
}
},
"body":{
"description":"The search source (in order to specify slice parameters)"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
---
"Search shards with slice specified in body":
- skip:
version: " - 2.18.99"
reason: "Added slice body to search_shards in 2.19"
- do:
indices.create:
index: test_index
body:
settings:
index:
number_of_shards: 7
number_of_replicas: 0

- do:
search_shards:
index: test_index
body:
slice:
id: 0
max: 3
- length: { shards: 3 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 0 }
- match: { shards.1.0.shard: 3 }
- match: { shards.2.0.shard: 6 }

- do:
search_shards:
index: test_index
body:
slice:
id: 1
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 1 }
- match: { shards.1.0.shard: 4 }

- do:
search_shards:
index: test_index
body:
slice:
id: 2
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 2 }
- match: { shards.1.0.shard: 5 }


- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 0
max: 3
- length: { shards: 2 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 0 }
- match: { shards.1.0.shard: 6 }

- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 1
max: 3
- length: { shards: 1 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 2 }

- do:
search_shards:
index: test_index
preference: "_shards:0,2,4,6"
body:
slice:
id: 2
max: 3
- length: { shards: 1 }
- match: { shards.0.0.index: "test_index" }
- match: { shards.0.0.shard: 4 }
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.shards;

import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.support.IndicesOptions;
Expand All @@ -41,6 +42,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.slice.SliceBuilder;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -61,6 +63,8 @@ public class ClusterSearchShardsRequest extends ClusterManagerNodeReadRequest<Cl
@Nullable
private String preference;
private IndicesOptions indicesOptions = IndicesOptions.lenientExpandOpen();
@Nullable
private SliceBuilder sliceBuilder;

public ClusterSearchShardsRequest() {}

Expand All @@ -76,6 +80,12 @@ public ClusterSearchShardsRequest(StreamInput in) throws IOException {
preference = in.readOptionalString();

indicesOptions = IndicesOptions.readIndicesOptions(in);
if (in.getVersion().onOrAfter(Version.V_2_19_0)) {
boolean hasSlice = in.readBoolean();
if (hasSlice) {
sliceBuilder = new SliceBuilder(in);
}
}
}

@Override
Expand All @@ -84,8 +94,15 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(indices);
out.writeOptionalString(routing);
out.writeOptionalString(preference);

indicesOptions.writeIndicesOptions(out);
if (out.getVersion().onOrAfter(Version.V_2_19_0)) {
if (sliceBuilder != null) {
out.writeBoolean(true);
sliceBuilder.writeTo(out);
} else {
out.writeBoolean(false);
}
}
}

@Override
Expand Down Expand Up @@ -166,4 +183,13 @@ public ClusterSearchShardsRequest preference(String preference) {
public String preference() {
return this.preference;
}

public ClusterSearchShardsRequest slice(SliceBuilder sliceBuilder) {
this.sliceBuilder = sliceBuilder;
return this;
}

public SliceBuilder slice() {
return this.sliceBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected void clusterManagerOperation(

Set<String> nodeIds = new HashSet<>();
GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
.searchShards(clusterState, concreteIndices, routingMap, request.preference());
.searchShards(clusterState, concreteIndices, routingMap, request.preference(), null, null, request.slice());
ShardRouting shard;
ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
int currentGroup = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener<
throw blockException;
}

shardsIt = clusterService.operationRouting()
.searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null);
shardsIt = clusterService.operationRouting().searchShards(clusterService.state(), new String[] { request.index() }, null, null);
}

public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.search.slice.SliceBuilder;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResourceTrackingService;
Expand Down Expand Up @@ -551,6 +552,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
collectSearchShards(
searchRequest.indicesOptions(),
searchRequest.preference(),
Expand All @@ -559,6 +561,7 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
remoteClusterIndices,
remoteClusterService,
threadPool,
slice,
ActionListener.wrap(searchShardsResponses -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = getRemoteClusterNodeLookup(
searchShardsResponses
Expand Down Expand Up @@ -787,6 +790,7 @@ static void collectSearchShards(
Map<String, OriginalIndices> remoteIndicesByCluster,
RemoteClusterService remoteClusterService,
ThreadPool threadPool,
SliceBuilder slice,
ActionListener<Map<String, ClusterSearchShardsResponse>> listener
) {
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
Expand All @@ -800,7 +804,8 @@ static void collectSearchShards(
ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices).indicesOptions(indicesOptions)
.local(true)
.preference(preference)
.routing(routing);
.routing(routing)
.slice(slice);
clusterClient.admin()
.cluster()
.searchShards(
Expand Down Expand Up @@ -1042,14 +1047,16 @@ private void executeSearch(
concreteLocalIndices[i] = indices[i].getName();
}
Map<String, Long> nodeSearchCounts = searchTransportService.getPendingSearchRequests();
SliceBuilder slice = searchRequest.source() == null ? null : searchRequest.source().slice();
GroupShardsIterator<ShardIterator> localShardRoutings = clusterService.operationRouting()
.searchShards(
clusterState,
concreteLocalIndices,
routingMap,
searchRequest.preference(),
searchService.getResponseCollectorService(),
nodeSearchCounts
nodeSearchCounts,
slice
);
localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false)
.map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.cluster.routing;

import org.apache.lucene.util.CollectionUtil;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.WeightedRoutingMetadata;
Expand All @@ -44,14 +45,17 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.search.slice.SliceBuilder;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -230,7 +234,7 @@ public GroupShardsIterator<ShardIterator> searchShards(
@Nullable Map<String, Set<String>> routing,
@Nullable String preference
) {
return searchShards(clusterState, concreteIndices, routing, preference, null, null);
return searchShards(clusterState, concreteIndices, routing, preference, null, null, null);
}

public GroupShardsIterator<ShardIterator> searchShards(
Expand All @@ -240,10 +244,24 @@ public GroupShardsIterator<ShardIterator> searchShards(
@Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts
) {
return searchShards(clusterState, concreteIndices, routing, preference, collectorService, nodeCounts, null);
}

public GroupShardsIterator<ShardIterator> searchShards(
ClusterState clusterState,
String[] concreteIndices,
@Nullable Map<String, Set<String>> routing,
@Nullable String preference,
@Nullable ResponseCollectorService collectorService,
@Nullable Map<String, Long> nodeCounts,
@Nullable SliceBuilder slice
) {
final Set<IndexShardRoutingTable> shards = computeTargetedShards(clusterState, concreteIndices, routing);
final Set<ShardIterator> set = new HashSet<>(shards.size());

Map<Index, List<ShardIterator>> shardIterators = new HashMap<>();
for (IndexShardRoutingTable shard : shards) {

IndexMetadata indexMetadataForShard = indexMetadata(clusterState, shard.shardId.getIndex().getName());
if (indexMetadataForShard.isRemoteSnapshot() && (preference == null || preference.isEmpty())) {
preference = Preference.PRIMARY.type();
Expand Down Expand Up @@ -274,10 +292,31 @@ public GroupShardsIterator<ShardIterator> searchShards(
clusterState.metadata().weightedRoutingMetadata()
);
if (iterator != null) {
set.add(iterator);
shardIterators.computeIfAbsent(iterator.shardId().getIndex(), k -> new ArrayList<>()).add(iterator);
}
}
return GroupShardsIterator.sortAndCreate(new ArrayList<>(set));
List<ShardIterator> allShardIterators = new ArrayList<>();
if (slice != null) {
for (List<ShardIterator> indexIterators : shardIterators.values()) {
// Filter the returned shards for the given slice
CollectionUtil.timSort(indexIterators);
// We use the ordinal of the iterator in the group (after sorting) rather than the shard id, because
// computeTargetedShards may return a subset of shards for an index, if a routing parameter was
// specified. In that case, the set of routable shards is considered the full universe of available
// shards for each index, when mapping shards to slices. If no routing parameter was specified,
// then ordinals and shard IDs are the same. This mimics the logic in
// org.opensearch.search.slice.SliceBuilder.toFilter.
for (int i = 0; i < indexIterators.size(); i++) {
if (slice.shardMatches(i, indexIterators.size())) {
allShardIterators.add(indexIterators.get(i));
}
}
}
} else {
shardIterators.values().forEach(allShardIterators::addAll);
}

return GroupShardsIterator.sortAndCreate(allShardIterators);
}

public static ShardIterator getShards(ClusterState clusterState, ShardId shardId) {
Expand Down Expand Up @@ -311,6 +350,7 @@ private Set<IndexShardRoutingTable> computeTargetedShards(
set.add(indexShard);
}
}

}
return set;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -81,6 +82,13 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
clusterSearchShardsRequest.routing(request.param("routing"));
clusterSearchShardsRequest.preference(request.param("preference"));
clusterSearchShardsRequest.indicesOptions(IndicesOptions.fromRequest(request, clusterSearchShardsRequest.indicesOptions()));
if (request.hasContentOrSourceParam()) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.parseXContent(request.contentOrSourceParamParser());
if (sourceBuilder.slice() != null) {
clusterSearchShardsRequest.slice(sourceBuilder.slice());
}
}
return channel -> client.admin().cluster().searchShards(clusterSearchShardsRequest, new RestToXContentListener<>(channel));
}
}
Loading

0 comments on commit 81ee44a

Please sign in to comment.