From 7539fb7da6148e3455b61e9ede14470fcabcf5fd Mon Sep 17 00:00:00 2001
From: Nik Everett
Date: Tue, 21 Jan 2025 16:21:01 -0500
Subject: [PATCH] ESQL: Make it a little easier to test LOOKUP (#120540)
(#120555)
* ESQL: Make it a little easier to test LOOKUP
This changes the internals of LOOKUP so they don't rely directly on
`SearchContext`, instead relying on their own `LookupShardContext` which
is easy to build from a `SearchContext`. The advantage is that it's
easier to build it *without* a `SearchContext` which makes unit testing
a ton easier.
* JAVADOC
---
.../esql/enrich/AbstractLookupService.java | 69 ++++++++++++++-----
.../esql/enrich/EnrichLookupService.java | 5 +-
.../esql/enrich/LookupFromIndexService.java | 5 +-
.../esql/plugin/TransportEsqlQueryAction.java | 20 +++++-
4 files changed, 72 insertions(+), 27 deletions(-)
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
index 13f0325d48d6b..6de9e5a32c539 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java
@@ -129,10 +129,10 @@
* the same number of rows that it was sent no matter how many documents match.
*
*/
-abstract class AbstractLookupService {
+public abstract class AbstractLookupService {
private final String actionName;
private final ClusterService clusterService;
- private final SearchService searchService;
+ private final CreateShardContext createShardContext;
private final TransportService transportService;
private final Executor executor;
private final BigArrays bigArrays;
@@ -151,7 +151,7 @@ abstract class AbstractLookupService
final List releasables = new ArrayList<>(6);
boolean started = false;
try {
- final ShardSearchRequest shardSearchRequest = new ShardSearchRequest(request.shardId, 0, AliasFilter.EMPTY);
- final SearchContext searchContext = searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT);
- releasables.add(searchContext);
+ LookupShardContext shardContext = createShardContext.create(request.shardId);
+ releasables.add(shardContext.release);
final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker(
blockFactory.breaker(),
localBreakerSettings.overReservedBytes(),
@@ -366,8 +365,7 @@ private void doLookup(T request, CancellableTask task, ActionListener
}
}
releasables.add(finishPages);
- SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
- QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
+ QueryList queryList = queryList(request, shardContext.executionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
DriverContext.WarningsMode.COLLECT,
request.source.source().getLineNumber(),
@@ -378,11 +376,11 @@ private void doLookup(T request, CancellableTask task, ActionListener
driverContext.blockFactory(),
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
queryList,
- searchExecutionContext.getIndexReader(),
+ shardContext.context.searcher().getIndexReader(),
warnings
);
releasables.add(queryOperator);
- var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
+ var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields);
releasables.add(extractFieldsOperator);
/*
@@ -405,7 +403,7 @@ private void doLookup(T request, CancellableTask task, ActionListener
List.of(extractFieldsOperator, finishPages),
outputOperator,
Driver.DEFAULT_STATUS_INTERVAL,
- Releasables.wrap(searchContext, localBreaker)
+ Releasables.wrap(shardContext.release, localBreaker)
);
task.addListener(() -> {
String reason = Objects.requireNonNullElse(task.getReasonCancelled(), "task was cancelled");
@@ -430,15 +428,10 @@ private void doLookup(T request, CancellableTask task, ActionListener
}
private static Operator extractFieldsOperator(
- SearchContext searchContext,
+ EsPhysicalOperationProviders.ShardContext shardContext,
DriverContext driverContext,
List extractFields
) {
- EsPhysicalOperationProviders.ShardContext shardContext = new EsPhysicalOperationProviders.DefaultShardContext(
- 0,
- searchContext.getSearchExecutionContext(),
- searchContext.request().getAliasFilter()
- );
List fields = new ArrayList<>(extractFields.size());
for (NamedExpression extractField : extractFields) {
BlockLoader loader = shardContext.blockLoader(
@@ -462,7 +455,7 @@ private static Operator extractFieldsOperator(
return new ValuesSourceReaderOperator(
driverContext.blockFactory(),
fields,
- List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.searcher().getIndexReader(), searchContext::newSourceLoader)),
+ List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)),
0
);
}
@@ -670,4 +663,42 @@ public boolean hasReferences() {
return refs.hasReferences();
}
}
+
+ /**
+ * Create a {@link LookupShardContext} for a locally allocated {@link ShardId}.
+ */
+ public interface CreateShardContext {
+ LookupShardContext create(ShardId shardId) throws IOException;
+
+ static CreateShardContext fromSearchService(SearchService searchService) {
+ return shardId -> {
+ ShardSearchRequest shardSearchRequest = new ShardSearchRequest(shardId, 0, AliasFilter.EMPTY);
+ return LookupShardContext.fromSearchContext(
+ searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT)
+ );
+ };
+ }
+ }
+
+ /**
+ * {@link AbstractLookupService} uses this to power the queries and field loading that
+ * it needs to perform to actually do the lookup.
+ */
+ public record LookupShardContext(
+ EsPhysicalOperationProviders.ShardContext context,
+ SearchExecutionContext executionContext,
+ Releasable release
+ ) {
+ public static LookupShardContext fromSearchContext(SearchContext context) {
+ return new LookupShardContext(
+ new EsPhysicalOperationProviders.DefaultShardContext(
+ 0,
+ context.getSearchExecutionContext(),
+ context.request().getAliasFilter()
+ ),
+ context.getSearchExecutionContext(),
+ context
+ );
+ }
+ }
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
index a343e368375cd..3f67f13edd55d 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java
@@ -23,7 +23,6 @@
import org.elasticsearch.index.mapper.RangeType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
@@ -48,7 +47,7 @@ public class EnrichLookupService extends AbstractLookupService