Skip to content

Commit

Permalink
Off heap changes for star tree
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Jul 24, 2024
1 parent e749424 commit 37803ce
Show file tree
Hide file tree
Showing 9 changed files with 1,056 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package org.opensearch.index.codec.composite;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.DocValues;
Expand Down Expand Up @@ -50,9 +48,9 @@ public class Composite99DocValuesWriter extends DocValuesConsumer {
private final Set<CompositeMappedFieldType> compositeMappedFieldTypes;
private final Set<String> compositeFieldSet;
private final Set<String> segmentFieldSet;
private final boolean segmentHasCompositeFields;

private final Map<String, DocValuesProducer> fieldProducerMap = new HashMap<>();
private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class);

public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) {

Expand All @@ -70,6 +68,8 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState
for (CompositeMappedFieldType type : compositeMappedFieldTypes) {
compositeFieldSet.addAll(type.fields());
}
// check if there are any composite fields which are part of the segment
segmentHasCompositeFields = !Collections.disjoint(segmentFieldSet, compositeFieldSet);
}

@Override
Expand All @@ -91,7 +91,7 @@ public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) th
public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
delegate.addSortedNumericField(field, valuesProducer);
// Perform this only during flush flow
if (mergeState.get() == null) {
if (mergeState.get() == null && segmentHasCompositeFields) {
createCompositeIndicesIfPossible(valuesProducer, field);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder {

protected final TreeNode rootNode = getNewNode();

private final StarTreeField starTreeField;
protected final StarTreeField starTreeField;
private final MapperService mapperService;
private final SegmentWriteState state;
static String NUM_SEGMENT_DOCS = "numSegmentDocs";
Expand Down Expand Up @@ -163,7 +163,7 @@ public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService ma
*
* @return Star tree documents
*/
public abstract List<StarTreeDocument> getStarTreeDocuments();
public abstract List<StarTreeDocument> getStarTreeDocuments() throws IOException;

/**
* Returns the value of the dimension for the given dimension id and document in the star-tree.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal
metricReaders.add(new SequentialDocValuesIterator(metricDocValuesEntry.getValue()));
}

boolean endOfDoc = false;
int currentDocId = 0;
int numSegmentDocs = Integer.parseInt(
starTreeValues.getAttributes().getOrDefault(NUM_SEGMENT_DOCS, String.valueOf(DocIdSetIterator.NO_MORE_DOCS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public void buildDuringMerge(final Map<String, List<StarTreeValues>> starTreeVal
continue;
}
StarTreeField starTreeField = starTreeValuesList.get(0).getStarTreeField();
StarTreeBuilder builder = getSingleTreeBuilder(starTreeField, state, mapperService);
builder.build(starTreeValuesList);
builder.close();
try (StarTreeBuilder builder = getSingleTreeBuilder(starTreeField, state, mapperService)) {
builder.build(starTreeValuesList);
}
}
logger.debug(
"Took {} ms to merge {} star-trees with star-tree fields",
Expand All @@ -122,8 +122,7 @@ StarTreeBuilder getSingleTreeBuilder(StarTreeField starTreeField, SegmentWriteSt
case ON_HEAP:
return new OnHeapStarTreeBuilder(starTreeField, state, mapperService);
case OFF_HEAP:
// TODO
// return new OffHeapStarTreeBuilder(starTreeField, state, mapperService);
return new OffHeapStarTreeBuilder(starTreeField, state, mapperService);
default:
throw new IllegalArgumentException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public static class Builder extends ParametrizedFieldMapper.Builder {
List.of(XContentMapValues.nodeStringArrayValue(paramMap.getOrDefault(SKIP_STAR_NODE_IN_DIMS, new ArrayList<String>())))
);
paramMap.remove(SKIP_STAR_NODE_IN_DIMS);
// TODO : change this to off heap once off heap gets implemented
StarTreeFieldConfiguration.StarTreeBuildMode buildMode = StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP;
StarTreeFieldConfiguration.StarTreeBuildMode buildMode = StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP;

List<Dimension> dimensions = buildDimensions(name, paramMap, context);
paramMap.remove(ORDERED_DIMENSIONS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ public void test_sortAndAggregateStarTreeDocuments_nullDimensionsAndNullMetrics(
starTreeDocuments[3] = new StarTreeDocument(new Long[] { null, null, null, null }, new Double[] { null, null, null });
starTreeDocuments[4] = new StarTreeDocument(new Long[] { null, null, null, null }, new Double[] { null, null, null });

List<StarTreeDocument> inorderStarTreeDocuments = List.of();
List<StarTreeDocument> inorderStarTreeDocuments = List.of(
new StarTreeDocument(new Long[] { null, null, null, null }, new Object[] { 0.0, 0.0, 5L })
);
Iterator<StarTreeDocument> expectedStarTreeDocumentIterator = inorderStarTreeDocuments.iterator();

StarTreeDocument[] segmentStarTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments];
Expand Down Expand Up @@ -1055,7 +1057,7 @@ public void testFlushFlow() throws IOException {
SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField);
SortedNumericDocValues m2sndv = getSortedNumericMock(metricsList, metricsWithField);

OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(6), mapperService);
builder = getStarTreeBuilder(sf, getWriteState(6), mapperService);
SequentialDocValuesIterator[] dimDvs = { new SequentialDocValuesIterator(d1sndv), new SequentialDocValuesIterator(d2sndv) };
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.sortAndAggregateSegmentDocuments(
dimDvs,
Expand Down Expand Up @@ -1120,7 +1122,7 @@ public void testFlushFlowBuild() throws IOException {
SortedNumericDocValues d2sndv = getSortedNumericMock(dimList2, docsWithField2);
SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField);

BaseStarTreeBuilder builder = getStarTreeBuilder(sf, getWriteState(100), mapperService);
builder = getStarTreeBuilder(sf, getWriteState(100), mapperService);

DocValuesProducer d1vp = getDocValuesProducer(d1sndv);
DocValuesProducer d2vp = getDocValuesProducer(d2sndv);
Expand Down Expand Up @@ -1209,7 +1211,7 @@ public void testMergeFlowWithSum() throws IOException {
sf,
"6"
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(6), mapperService);
builder = getStarTreeBuilder(sf, getWriteState(6), mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
* Asserting following dim / metrics [ dim1, dim2 / Sum [ metric] ]
Expand Down Expand Up @@ -1259,7 +1261,7 @@ public void testMergeFlowWithCount() throws IOException {
sf,
"6"
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(6), mapperService);
builder = getStarTreeBuilder(sf, getWriteState(6), mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
* Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ]
Expand Down Expand Up @@ -1336,7 +1338,7 @@ public void testMergeFlowWithDifferentDocsFromSegments() throws IOException {
sf,
"4"
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(4), mapperService);
builder = getStarTreeBuilder(sf, getWriteState(4), mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
* Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ]
Expand All @@ -1363,6 +1365,66 @@ public void testMergeFlowWithDifferentDocsFromSegments() throws IOException {
assertEquals(9, count);
}

public void testMergeFlowNumSegmentsDocs() throws IOException {
List<Long> dimList = List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, -1L, -1L, -1L);
List<Integer> docsWithField = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
List<Long> dimList2 = List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, -1L, -1L, -1L);
List<Integer> docsWithField2 = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

List<Long> metricsList = List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, -1L, -1L, -1L);
List<Integer> metricsWithField = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);

List<Long> dimList3 = List.of(5L, 6L, 7L, 8L, -1L);
List<Integer> docsWithField3 = List.of(0, 1, 2, 3, 4);
List<Long> dimList4 = List.of(5L, 6L, 7L, 8L, -1L);
List<Integer> docsWithField4 = List.of(0, 1, 2, 3, 4);

List<Long> metricsList2 = List.of(5L, 6L, 7L, 8L, 9L);
List<Integer> metricsWithField2 = List.of(0, 1, 2, 3, 4);

StarTreeField sf = getStarTreeField(MetricStat.COUNT);
StarTreeValues starTreeValues = getStarTreeValues(
getSortedNumericMock(dimList, docsWithField),
getSortedNumericMock(dimList2, docsWithField2),
getSortedNumericMock(metricsList, metricsWithField),
sf,
"6"
);

StarTreeValues starTreeValues2 = getStarTreeValues(
getSortedNumericMock(dimList3, docsWithField3),
getSortedNumericMock(dimList4, docsWithField4),
getSortedNumericMock(metricsList2, metricsWithField2),
sf,
"4"
);
builder = getStarTreeBuilder(sf, getWriteState(4), mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
* Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ]
[0, 0] | [0]
[1, 1] | [1]
[2, 2] | [2]
[3, 3] | [3]
[4, 4] | [4]
[5, 5] | [10]
[6, 6] | [6]
[7, 7] | [7]
[8, 8] | [8]
*/
int count = 0;
while (starTreeDocumentIterator.hasNext()) {
count++;
StarTreeDocument starTreeDocument = starTreeDocumentIterator.next();
if (Objects.equals(starTreeDocument.dimensions[0], 5L)) {
assertEquals(starTreeDocument.dimensions[0] * 2, starTreeDocument.metrics[0]);
} else {
assertEquals(starTreeDocument.dimensions[1], starTreeDocument.metrics[0]);
}
}
assertEquals(9, count);
}

public void testMergeFlowWithMissingDocs() throws IOException {
List<Long> dimList = List.of(0L, 1L, 2L, 3L, 4L, 6L);
List<Integer> docsWithField = List.of(0, 1, 2, 3, 4, 6);
Expand Down Expand Up @@ -1396,7 +1458,7 @@ public void testMergeFlowWithMissingDocs() throws IOException {
sf,
"4"
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(4), mapperService);
builder = getStarTreeBuilder(sf, getWriteState(4), mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
* Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ]
Expand Down Expand Up @@ -1456,7 +1518,7 @@ public void testMergeFlowWithMissingDocsInSecondDim() throws IOException {
sf,
"4"
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(4), mapperService);
builder = getStarTreeBuilder(sf, getWriteState(4), mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
* Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ]
Expand Down Expand Up @@ -1517,7 +1579,7 @@ public void testMergeFlowWithDocsMissingAtTheEnd() throws IOException {
sf,
"4"
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService);
builder = getStarTreeBuilder(sf, writeState, mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
* Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ]
Expand Down Expand Up @@ -1569,7 +1631,7 @@ public void testMergeFlowWithEmptyFieldsInOneSegment() throws IOException {
sf,
"0"
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(0), mapperService);
builder = getStarTreeBuilder(sf, getWriteState(0), mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
* Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ]
Expand Down Expand Up @@ -1664,8 +1726,8 @@ public void testMergeFlowWithDuplicateDimensionValues() throws IOException {
metricsWithField,
sf
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService);
builder.build(List.of(starTreeValues, starTreeValues2));
builder = getStarTreeBuilder(sf, writeState, mapperService);
builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)));
List<StarTreeDocument> starTreeDocuments = builder.getStarTreeDocuments();
assertEquals(401, starTreeDocuments.size());
int count = 0;
Expand Down Expand Up @@ -1774,8 +1836,8 @@ public void testMergeFlowWithMaxLeafDocs() throws IOException {
sf
);

OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService);
builder.build(List.of(starTreeValues, starTreeValues2));
builder = getStarTreeBuilder(sf, writeState, mapperService);
builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)));
List<StarTreeDocument> starTreeDocuments = builder.getStarTreeDocuments();
/**
635 docs get generated
Expand Down Expand Up @@ -1892,8 +1954,8 @@ public void testMergeFlowWithDuplicateDimensionValueWithMaxLeafDocs() throws IOE
metricsWithField,
sf
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService);
builder.build(List.of(starTreeValues, starTreeValues2));
builder = getStarTreeBuilder(sf, writeState, mapperService);
builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)));
List<StarTreeDocument> starTreeDocuments = builder.getStarTreeDocuments();
assertEquals(401, starTreeDocuments.size());
builder.close();
Expand Down Expand Up @@ -1991,8 +2053,8 @@ public void testMergeFlowWithMaxLeafDocsAndStarTreeNodesAssertion() throws IOExc
metricsWithField,
sf
);
OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService);
builder.build(List.of(starTreeValues, starTreeValues2));
builder = getStarTreeBuilder(sf, writeState, mapperService);
builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)));
List<StarTreeDocument> starTreeDocuments = builder.getStarTreeDocuments();
Map<Integer, Map<Long, Integer>> dimValueToDocIdMap = new HashMap<>();
traverseStarTree(builder.rootNode, dimValueToDocIdMap, true);
Expand Down Expand Up @@ -2151,7 +2213,7 @@ public void testMergeFlow() throws IOException {
getAttributes(1000)
);

BaseStarTreeBuilder builder = getStarTreeBuilder(sf, writeState, mapperService);
builder = getStarTreeBuilder(sf, writeState, mapperService);
Iterator<StarTreeDocument> starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2));
/**
[0, 0, 0, 0] | [0.0]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.compositeindex.datacube.startree.builder;

import org.apache.lucene.index.SegmentWriteState;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.mapper.MapperService;

import java.io.IOException;

public class OffHeapStarTreeBuilderTests extends AbstractStarTreeBuilderTests {
@Override
public BaseStarTreeBuilder getStarTreeBuilder(
StarTreeField starTreeField,
SegmentWriteState segmentWriteState,
MapperService mapperService
) throws IOException {
return new OffHeapStarTreeBuilder(starTreeField, segmentWriteState, mapperService, randomIntBetween(2, 6));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,6 @@ public void test_getStarTreeBuilder() throws IOException {
assertTrue(starTreeBuilder instanceof OnHeapStarTreeBuilder);
}

public void test_getStarTreeBuilder_illegalArgument() {
when(mapperService.getCompositeFieldTypes()).thenReturn(Set.of(starTreeFieldType));
StarTreeFieldConfiguration starTreeFieldConfiguration = new StarTreeFieldConfiguration(1, new HashSet<>(), StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP);
StarTreeField starTreeField = new StarTreeField("star_tree", new ArrayList<>(), new ArrayList<>(), starTreeFieldConfiguration);
StarTreesBuilder starTreesBuilder = new StarTreesBuilder(segmentWriteState, mapperService);
assertThrows(IllegalArgumentException.class, () -> starTreesBuilder.getSingleTreeBuilder(starTreeField, segmentWriteState, mapperService));
}

public void test_closeWithNoStarTreeFields() throws IOException {
StarTreeFieldConfiguration starTreeFieldConfiguration = new StarTreeFieldConfiguration(
1,
Expand Down

0 comments on commit 37803ce

Please sign in to comment.