Skip to content

Commit

Permalink
Refactoring based on PR Comments and added JavaDocs
Browse files Browse the repository at this point in the history
Signed-off-by: expani <[email protected]>
  • Loading branch information
expani committed Aug 1, 2024
1 parent fb84412 commit a09dd02
Showing 1 changed file with 55 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ private void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOExcept
}
// we need to fill-in the blanks
for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) {
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx, bucketOrds, null, null);
// brute force
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx, bucketOrds, null, null);
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
collector.apply(docId, owningBucketOrd);
}
Expand Down Expand Up @@ -363,81 +363,65 @@ public MultiTermsValuesSourceCollector getValues(
for (InternalValuesSource valuesSource : valuesSources) {
collectors.add(valuesSource.apply(ctx));
}
return new MultiValuesSourceCollectorImpl(collectors, scratch, bucketOrds, aggregator, sub);
}

@Override
public void close() {
scratch.close();
}
}

static class MultiValuesSourceCollectorImpl implements MultiTermsValuesSourceCollector {

private final List<InternalValuesSourceCollector> collectors;
private final BytesStreamOutput scratch;
private final BytesKeyedBucketOrds bucketOrds;
private final BucketsAggregator aggregator;
private final LeafBucketCollector sub;

private final boolean collectViaAggregator;

public MultiValuesSourceCollectorImpl(
List<InternalValuesSourceCollector> collectors,
BytesStreamOutput scratch,
BytesKeyedBucketOrds bucketOrds,
BucketsAggregator aggregator,
LeafBucketCollector sub
) {
this.collectors = collectors;
this.scratch = scratch;
this.bucketOrds = bucketOrds;
this.aggregator = aggregator;
this.sub = sub;
this.collectViaAggregator = aggregator != null && sub != null;
}
boolean collectBucketOrds = aggregator != null && sub != null;
return new MultiTermsValuesSourceCollector() {
@Override
public void apply(int doc, long owningBucketOrd) throws IOException {
// TODO A new list creation can be avoided for every doc.
List<List<TermValue<?>>> collectedValues = new ArrayList<>();
for (InternalValuesSourceCollector collector : collectors) {
collectedValues.add(collector.apply(doc));
}
scratch.seek(0);
scratch.writeVInt(collectors.size()); // number of fields per composite key
generateAndCollectCompositeKeys(collectedValues, 0, owningBucketOrd, doc);
}

@Override
public void apply(int doc, long owningBucketOrd) throws IOException {
List<List<TermValue<?>>> collectedValues = new ArrayList<>();
for (InternalValuesSourceCollector collector : collectors) {
collectedValues.add(collector.apply(doc));
}
scratch.seek(0);
scratch.writeVInt(collectors.size()); // number of fields per composite key
cartesianProductRecursive(collectedValues, 0, owningBucketOrd, doc);
}
/**
* This generates and collects all Composite keys in their buckets by performing a cartesian product <br>
* of all the values of all fields for the given doc recursively.
* @param collectedValues : Values of all fields present in the aggregation for the @doc
* @param index : Points to the field being added to generate the composite key
*/
private void generateAndCollectCompositeKeys(
List<List<TermValue<?>>> collectedValues,
int index,
long owningBucketOrd,
int doc
) throws IOException {
if (collectedValues.size() == index) {
// Avoid performing a deep copy of the composite key by inlining
long bucketOrd = bucketOrds.add(owningBucketOrd, scratch.bytes().toBytesRef());
if (collectBucketOrds) {
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
aggregator.collectExistingBucket(sub, doc, bucketOrd);
} else {
aggregator.collectBucket(sub, doc, bucketOrd);
}
}
return;
}

/**
* Cartesian product using depth first search.
*/
private void cartesianProductRecursive(List<List<TermValue<?>>> collectedValues, int index, long owningBucketOrd, int doc)
throws IOException {
if (collectedValues.size() == index) {
// Avoid performing a deep copy of the composite key
long bucketOrd = bucketOrds.add(owningBucketOrd, scratch.bytes().toBytesRef());
if (collectViaAggregator) {
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
aggregator.collectExistingBucket(sub, doc, bucketOrd);
} else {
aggregator.collectBucket(sub, doc, bucketOrd);
long position = scratch.position();
List<TermValue<?>> values = collectedValues.get(index);
int numIterations = values.size();
// For each loop is not done to reduce the allocations done for Iterator objects
// once for every field in every doc.
for (int i = 0; i < numIterations; i++) {
TermValue<?> value = values.get(i);
value.writeTo(scratch); // encode the value
generateAndCollectCompositeKeys(collectedValues, index + 1, owningBucketOrd, doc); // dfs
scratch.seek(position); // backtrack
}
}
return;
}

long position = scratch.position();
List<TermValue<?>> values = collectedValues.get(index);
int numIterations = values.size();
for (int i = 0; i < numIterations; i++) {
TermValue<?> value = values.get(i);
value.writeTo(scratch); // encode the value
cartesianProductRecursive(collectedValues, index + 1, owningBucketOrd, doc); // dfs
scratch.seek(position); // backtrack
}
};
}

@Override
public void close() {
scratch.close();
}
}

/**
Expand Down Expand Up @@ -467,6 +451,7 @@ static InternalValuesSource bytesValuesSource(ValuesSource valuesSource, Include
if (i > 0 && bytes.equals(previous)) {
continue;
}
// Performing a deep copy is not required for field containing only one value.
if (valuesCount > 1) {
BytesRef copy = BytesRef.deepCopyOf(bytes);
termValues.add(TermValue.of(copy));
Expand Down

0 comments on commit a09dd02

Please sign in to comment.