Skip to content

Commit

Permalink
keyword, numeric terms aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Sandesh Kumar <[email protected]>
  • Loading branch information
sandeshkr419 committed Jan 29, 2025
1 parent b9ddef9 commit 1420e07
Show file tree
Hide file tree
Showing 11 changed files with 995 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Propagate the sourceIncludes and excludes fields from fetchSourceContext to FieldsVisitor. ([#17080](https://github.com/opensearch-project/OpenSearch/pull/17080))
- [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree ([#16674](https://github.com/opensearch-project/OpenSearch/pull/16674))
- [Star Tree] [Search] Extensible design to support different query and field types ([#17137](https://github.com/opensearch-project/OpenSearch/pull/17137))
- [Star Tree] [Search] Resolving keyword & numeric bucket aggregation with metric aggregation using star-tree ([#17165](https://github.com/opensearch-project/OpenSearch/pull/17165))

### Dependencies
- Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,4 @@ public TermsEnum termsEnum() throws IOException {
public TermsEnum intersect(CompiledAutomaton automaton) throws IOException {
return ((SortedSetDocValues) docIdSetIterator).intersect(automaton);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private boolean starTreeDateRoundingRequired = true;

private final FilterRewriteOptimizationContext filterRewriteOptimizationContext;
public final String STARTREE_TIMESTAMP_FIELD = "@timestamp";
public final String fieldName;

DateHistogramAggregator(
String name,
Expand Down Expand Up @@ -137,6 +137,9 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
this.hardBounds = hardBounds;
// TODO: Stop using null here
this.valuesSource = valuesSourceConfig.hasValues() ? (ValuesSource.Numeric) valuesSourceConfig.getValuesSource() : null;
this.fieldName = valuesSourceConfig.hasValues()
? ((ValuesSource.Numeric.FieldData) valuesSourceConfig.getValuesSource()).getIndexFieldName()
: null;
this.formatter = valuesSourceConfig.format();

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
Expand Down Expand Up @@ -244,13 +247,13 @@ private String fetchStarTreeCalendarUnit() {
.next();
DateDimension starTreeDateDimension = (DateDimension) compositeMappedFieldType.getDimensions()
.stream()
.filter(dim -> dim.getField().equals(STARTREE_TIMESTAMP_FIELD))
.filter(dim -> dim.getField().equals(fieldName))
.findFirst() // Get the first matching time dimension
.orElseThrow(() -> new AssertionError(String.format(Locale.ROOT, "Date dimension '%s' not found", STARTREE_TIMESTAMP_FIELD)));
.orElseThrow(() -> new AssertionError(String.format(Locale.ROOT, "Date dimension '%s' not found", fieldName)));

DateTimeUnitAdapter dateTimeUnitRounding = new DateTimeUnitAdapter(this.rounding.unit());
DateTimeUnitRounding rounding = starTreeDateDimension.findClosestValidInterval(dateTimeUnitRounding);
String dimensionName = STARTREE_TIMESTAMP_FIELD + "_" + rounding.shortName();
String dimensionName = fieldName + "_" + rounding.shortName();
if (rounding.shortName().equals(this.rounding.unit().shortName())) {
this.starTreeDateRoundingRequired = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.PriorityQueue;
import org.opensearch.common.SetOnce;
import org.opensearch.common.lease.Releasable;
Expand All @@ -52,6 +54,12 @@
import org.opensearch.common.util.LongHash;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedSetStarTreeValuesIterator;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.AggregationExecutionException;
Expand All @@ -64,14 +72,20 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.startree.StarTreeTraversalUtil;
import org.opensearch.search.startree.filter.DimensionFilter;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand All @@ -86,7 +100,7 @@
*
* @opensearch.internal
*/
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator {
public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggregator implements StarTreePreComputeCollector {
protected final ResultStrategy<?, ?, ?> resultStrategy;
protected final ValuesSource.Bytes.WithOrdinals valuesSource;

Expand All @@ -98,6 +112,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
private final SetOnce<SortedSetDocValues> dvs = new SetOnce<>();
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
LongUnaryOperator globalOperator;

/**
* Lookup global ordinals
Expand Down Expand Up @@ -229,6 +244,14 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
SortedSetDocValues globalOrds = valuesSource.globalOrdinalsValues(ctx);
collectionStrategy.globalOrdsReady(globalOrds);
globalOperator = valuesSource.globalOrdinalsMapping(ctx);

CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
if (preComputeWithStarTree(ctx, supportedStarTree) == true) {
throw new CollectionTerminatedException();
}
}

if (collectionStrategy instanceof DenseGlobalOrds
&& this.resultStrategy instanceof StandardTermsResults
Expand Down Expand Up @@ -314,6 +337,87 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
});
}

public StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parent
) throws IOException {
assert parent == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
return new StarTreeBucketCollector(
starTreeValues,
StarTreeTraversalUtil.getStarTreeResult(
starTreeValues,
StarTreeQueryHelper.mergeDimensionFilterIfNotExists(
context.getQueryShardContext().getStarTreeQueryContext().getBaseQueryStarTreeFilter(),
fieldName,
List.of(DimensionFilter.MATCH_ALL_DEFAULT)
),
context
)
) {
@Override
public void setSubCollectors() throws IOException {
for (Aggregator aggregator : subAggregators) {
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
}
}

SortedSetStarTreeValuesIterator valuesIterator = (SortedSetStarTreeValuesIterator) starTreeValues.getDimensionValuesIterator(
fieldName
);

String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTree.getField(),
"_doc_count",
MetricStat.DOC_COUNT.getTypeName()
);
SortedNumericStarTreeValuesIterator docCountsIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getMetricValuesIterator(metricName);

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {

if (valuesIterator.advanceExact(starTreeEntry) == false) {
return;
}

for (int i = 0, count = valuesIterator.docValueCount(); i < count; i++) {
long dimensionValue = valuesIterator.nextOrd();
long ord = globalOperator.applyAsLong(dimensionValue);

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();

long bucketOrd = collectionStrategy.globalOrdToBucketOrd(0, ord);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
} else {
grow(bucketOrd + 1);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}
}
}
}
};
}

private boolean preComputeWithStarTree(LeafReaderContext ctx, CompositeIndexFieldInfo supportedStarTree) throws IOException {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
FixedBitSet matchingDocsBitSet = starTreeBucketCollector.getMatchingDocsBitSet();

int numBits = matchingDocsBitSet.length();
if (numBits > 0) {
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
? matchingDocsBitSet.nextSetBit(bit + 1)
: DocIdSetIterator.NO_MORE_DOCS) {
starTreeBucketCollector.collectStarTreeEntry(bit, 0);
}
}
return true;
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,23 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.PriorityQueue;
import org.opensearch.common.Numbers;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.LongArray;
import org.opensearch.index.codec.composite.CompositeIndexFieldInfo;
import org.opensearch.index.compositeindex.datacube.MetricStat;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.fielddata.FieldData;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand All @@ -52,6 +61,8 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.StarTreeBucketCollector;
import org.opensearch.search.aggregations.StarTreePreComputeCollector;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.IncludeExclude.LongFilter;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds.BucketOrdsEnum;
Expand All @@ -60,6 +71,9 @@
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.startree.StarTreeQueryHelper;
import org.opensearch.search.startree.StarTreeTraversalUtil;
import org.opensearch.search.startree.filter.DimensionFilter;

import java.io.IOException;
import java.math.BigInteger;
Expand All @@ -72,17 +86,19 @@

import static java.util.Collections.emptyList;
import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

/**
* Aggregate all docs that contain numeric terms
*
* @opensearch.internal
*/
public class NumericTermsAggregator extends TermsAggregator {
public class NumericTermsAggregator extends TermsAggregator implements StarTreePreComputeCollector {
private final ResultStrategy<?, ?> resultStrategy;
private final ValuesSource.Numeric valuesSource;
private final LongKeyedBucketOrds bucketOrds;
private final LongFilter longFilter;
private final String fieldName;

public NumericTermsAggregator(
String name,
Expand All @@ -104,6 +120,9 @@ public NumericTermsAggregator(
this.valuesSource = valuesSource;
this.longFilter = longFilter;
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);
this.fieldName = (this.valuesSource instanceof ValuesSource.Numeric.FieldData)
? ((ValuesSource.Numeric.FieldData) valuesSource).getIndexFieldName()
: null;
}

@Override
Expand All @@ -116,6 +135,13 @@ public ScoreMode scoreMode() {

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
CompositeIndexFieldInfo supportedStarTree = StarTreeQueryHelper.getSupportedStarTree(this.context.getQueryShardContext());
if (supportedStarTree != null) {
if (preComputeWithStarTree(ctx, supportedStarTree) == true) {
throw new CollectionTerminatedException();
}
}

SortedNumericDocValues values = resultStrategy.getValues(ctx);
return resultStrategy.wrapCollector(new LeafBucketCollectorBase(sub, values) {
@Override
Expand Down Expand Up @@ -145,6 +171,93 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
});
}

public StarTreeBucketCollector getStarTreeBucketCollector(
LeafReaderContext ctx,
CompositeIndexFieldInfo starTree,
StarTreeBucketCollector parent
) throws IOException {
assert parent == null;
StarTreeValues starTreeValues = StarTreeQueryHelper.getStarTreeValues(ctx, starTree);
return new StarTreeBucketCollector(
starTreeValues,
StarTreeTraversalUtil.getStarTreeResult(
starTreeValues,
StarTreeQueryHelper.mergeDimensionFilterIfNotExists(
context.getQueryShardContext().getStarTreeQueryContext().getBaseQueryStarTreeFilter(),
fieldName,
List.of(DimensionFilter.MATCH_ALL_DEFAULT)
),
context
)
) {
@Override
public void setSubCollectors() throws IOException {
for (Aggregator aggregator : subAggregators) {
this.subCollectors.add(((StarTreePreComputeCollector) aggregator).getStarTreeBucketCollector(ctx, starTree, this));
}
}

SortedNumericStarTreeValuesIterator valuesIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getDimensionValuesIterator(fieldName);

String metricName = StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTree.getField(),
"_doc_count",
MetricStat.DOC_COUNT.getTypeName()
);
SortedNumericStarTreeValuesIterator docCountsIterator = (SortedNumericStarTreeValuesIterator) starTreeValues
.getMetricValuesIterator(metricName);

@Override
public void collectStarTreeEntry(int starTreeEntry, long owningBucketOrd) throws IOException {
if (valuesIterator.advance(starTreeEntry) == NO_MORE_DOCS) {
return;
}
long dimensionValue = valuesIterator.nextValue();
// Only numeric & floating points are supported as of now in star-tree
// TODO: Add support for isBigInteger() when it gets supported in star-tree
if (valuesSource.isFloatingPoint()) {
double doubleValue = ((NumberFieldMapper.NumberFieldType) context.mapperService().fieldType(fieldName)).toDoubleValue(
dimensionValue
);
dimensionValue = NumericUtils.doubleToSortableLong(doubleValue);
}

for (int i = 0, count = valuesIterator.entryValueCount(); i < count; i++) {

if (docCountsIterator.advanceExact(starTreeEntry)) {
long metricValue = docCountsIterator.nextValue();
long bucketOrd = bucketOrds.add(owningBucketOrd, dimensionValue);

if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
} else {
grow(bucketOrd + 1);
collectStarTreeBucket(this, metricValue, bucketOrd, starTreeEntry);
}

}
}
}
};
}

private boolean preComputeWithStarTree(LeafReaderContext ctx, CompositeIndexFieldInfo supportedStarTree) throws IOException {
StarTreeBucketCollector starTreeBucketCollector = getStarTreeBucketCollector(ctx, supportedStarTree, null);
FixedBitSet matchingDocsBitSet = starTreeBucketCollector.getMatchingDocsBitSet();

int numBits = matchingDocsBitSet.length();
if (numBits > 0) {
for (int bit = matchingDocsBitSet.nextSetBit(0); bit != DocIdSetIterator.NO_MORE_DOCS; bit = (bit + 1 < numBits)
? matchingDocsBitSet.nextSetBit(bit + 1)
: DocIdSetIterator.NO_MORE_DOCS) {
starTreeBucketCollector.collectStarTreeEntry(bit, 0);
}
}
return true;
}

@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
Expand Down
Loading

0 comments on commit 1420e07

Please sign in to comment.