Skip to content

Commit

Permalink
ESQL: Make it a little easier to test LOOKUP (elastic#120540) (elasti…
Browse files Browse the repository at this point in the history
…c#120555)

* ESQL: Make it a little easier to test LOOKUP

This changes the internals of LOOKUP so they don't rely directly on
`SearchContext`, instead relying on their own `LookupShardContext` which
is easy to build from a `SearchContext`. The advantage is that it's
easier to build it *without* a `SearchContext` which makes unit testing
a ton easier.

* JAVADOC
  • Loading branch information
nik9000 authored Jan 21, 2025
1 parent c5a623a commit 7539fb7
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@
* the same number of rows that it was sent no matter how many documents match.
* </p>
*/
abstract class AbstractLookupService<R extends AbstractLookupService.Request, T extends AbstractLookupService.TransportRequest> {
public abstract class AbstractLookupService<R extends AbstractLookupService.Request, T extends AbstractLookupService.TransportRequest> {
private final String actionName;
private final ClusterService clusterService;
private final SearchService searchService;
private final CreateShardContext createShardContext;
private final TransportService transportService;
private final Executor executor;
private final BigArrays bigArrays;
Expand All @@ -151,7 +151,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
AbstractLookupService(
String actionName,
ClusterService clusterService,
SearchService searchService,
CreateShardContext createShardContext,
TransportService transportService,
BigArrays bigArrays,
BlockFactory blockFactory,
Expand All @@ -160,7 +160,7 @@ abstract class AbstractLookupService<R extends AbstractLookupService.Request, T
) {
this.actionName = actionName;
this.clusterService = clusterService;
this.searchService = searchService;
this.createShardContext = createShardContext;
this.transportService = transportService;
this.executor = transportService.getThreadPool().executor(ThreadPool.Names.SEARCH);
this.bigArrays = bigArrays;
Expand Down Expand Up @@ -326,9 +326,8 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
final List<Releasable> releasables = new ArrayList<>(6);
boolean started = false;
try {
final ShardSearchRequest shardSearchRequest = new ShardSearchRequest(request.shardId, 0, AliasFilter.EMPTY);
final SearchContext searchContext = searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT);
releasables.add(searchContext);
LookupShardContext shardContext = createShardContext.create(request.shardId);
releasables.add(shardContext.release);
final LocalCircuitBreaker localBreaker = new LocalCircuitBreaker(
blockFactory.breaker(),
localBreakerSettings.overReservedBytes(),
Expand Down Expand Up @@ -366,8 +365,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
}
}
releasables.add(finishPages);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
QueryList queryList = queryList(request, shardContext.executionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
DriverContext.WarningsMode.COLLECT,
request.source.source().getLineNumber(),
Expand All @@ -378,11 +376,11 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
driverContext.blockFactory(),
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
queryList,
searchExecutionContext.getIndexReader(),
shardContext.context.searcher().getIndexReader(),
warnings
);
releasables.add(queryOperator);
var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
var extractFieldsOperator = extractFieldsOperator(shardContext.context, driverContext, request.extractFields);
releasables.add(extractFieldsOperator);

/*
Expand All @@ -405,7 +403,7 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
List.of(extractFieldsOperator, finishPages),
outputOperator,
Driver.DEFAULT_STATUS_INTERVAL,
Releasables.wrap(searchContext, localBreaker)
Releasables.wrap(shardContext.release, localBreaker)
);
task.addListener(() -> {
String reason = Objects.requireNonNullElse(task.getReasonCancelled(), "task was cancelled");
Expand All @@ -430,15 +428,10 @@ private void doLookup(T request, CancellableTask task, ActionListener<List<Page>
}

private static Operator extractFieldsOperator(
SearchContext searchContext,
EsPhysicalOperationProviders.ShardContext shardContext,
DriverContext driverContext,
List<NamedExpression> extractFields
) {
EsPhysicalOperationProviders.ShardContext shardContext = new EsPhysicalOperationProviders.DefaultShardContext(
0,
searchContext.getSearchExecutionContext(),
searchContext.request().getAliasFilter()
);
List<ValuesSourceReaderOperator.FieldInfo> fields = new ArrayList<>(extractFields.size());
for (NamedExpression extractField : extractFields) {
BlockLoader loader = shardContext.blockLoader(
Expand All @@ -462,7 +455,7 @@ private static Operator extractFieldsOperator(
return new ValuesSourceReaderOperator(
driverContext.blockFactory(),
fields,
List.of(new ValuesSourceReaderOperator.ShardContext(searchContext.searcher().getIndexReader(), searchContext::newSourceLoader)),
List.of(new ValuesSourceReaderOperator.ShardContext(shardContext.searcher().getIndexReader(), shardContext::newSourceLoader)),
0
);
}
Expand Down Expand Up @@ -670,4 +663,42 @@ public boolean hasReferences() {
return refs.hasReferences();
}
}

/**
* Create a {@link LookupShardContext} for a locally allocated {@link ShardId}.
*/
public interface CreateShardContext {
LookupShardContext create(ShardId shardId) throws IOException;

static CreateShardContext fromSearchService(SearchService searchService) {
return shardId -> {
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(shardId, 0, AliasFilter.EMPTY);
return LookupShardContext.fromSearchContext(
searchService.createSearchContext(shardSearchRequest, SearchService.NO_TIMEOUT)
);
};
}
}

/**
* {@link AbstractLookupService} uses this to power the queries and field loading that
* it needs to perform to actually do the lookup.
*/
public record LookupShardContext(
EsPhysicalOperationProviders.ShardContext context,
SearchExecutionContext executionContext,
Releasable release
) {
public static LookupShardContext fromSearchContext(SearchContext context) {
return new LookupShardContext(
new EsPhysicalOperationProviders.DefaultShardContext(
0,
context.getSearchExecutionContext(),
context.request().getAliasFilter()
),
context.getSearchExecutionContext(),
context
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.index.mapper.RangeType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.security.authz.privilege.ClusterPrivilegeResolver;
Expand All @@ -48,15 +47,15 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi

public EnrichLookupService(
ClusterService clusterService,
SearchService searchService,
CreateShardContext createShardContext,
TransportService transportService,
BigArrays bigArrays,
BlockFactory blockFactory
) {
super(
LOOKUP_ACTION_NAME,
clusterService,
searchService,
createShardContext,
transportService,
bigArrays,
blockFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
Expand All @@ -47,15 +46,15 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde

public LookupFromIndexService(
ClusterService clusterService,
SearchService searchService,
CreateShardContext createShardContext,
TransportService transportService,
BigArrays bigArrays,
BlockFactory blockFactory
) {
super(
LOOKUP_ACTION_NAME,
clusterService,
searchService,
createShardContext,
transportService,
bigArrays,
blockFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.enrich.AbstractLookupService;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
Expand Down Expand Up @@ -107,8 +108,23 @@ public TransportEsqlQueryAction(
exchangeService.registerTransportHandler(transportService);
this.exchangeService = exchangeService;
this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
this.enrichLookupService = new EnrichLookupService(clusterService, searchService, transportService, bigArrays, blockFactory);
this.lookupFromIndexService = new LookupFromIndexService(clusterService, searchService, transportService, bigArrays, blockFactory);
AbstractLookupService.CreateShardContext lookupCreateShardContext = AbstractLookupService.CreateShardContext.fromSearchService(
searchService
);
this.enrichLookupService = new EnrichLookupService(
clusterService,
lookupCreateShardContext,
transportService,
bigArrays,
blockFactory
);
this.lookupFromIndexService = new LookupFromIndexService(
clusterService,
lookupCreateShardContext,
transportService,
bigArrays,
blockFactory
);
this.computeService = new ComputeService(
searchService,
transportService,
Expand Down

0 comments on commit 7539fb7

Please sign in to comment.