diff --git a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java index 3859d3c998573..b44ea2ae6cb52 100644 --- a/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java +++ b/server/src/main/java/org/opensearch/index/codec/composite/Composite99DocValuesWriter.java @@ -8,8 +8,6 @@ package org.opensearch.index.codec.composite; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.DocValues; @@ -50,9 +48,9 @@ public class Composite99DocValuesWriter extends DocValuesConsumer { private final Set compositeMappedFieldTypes; private final Set compositeFieldSet; private final Set segmentFieldSet; + private final boolean segmentHasCompositeFields; private final Map fieldProducerMap = new HashMap<>(); - private static final Logger logger = LogManager.getLogger(Composite99DocValuesWriter.class); public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState segmentWriteState, MapperService mapperService) { @@ -70,6 +68,8 @@ public Composite99DocValuesWriter(DocValuesConsumer delegate, SegmentWriteState for (CompositeMappedFieldType type : compositeMappedFieldTypes) { compositeFieldSet.addAll(type.fields()); } + // check if there are any composite fields which are part of the segment + segmentHasCompositeFields = !Collections.disjoint(segmentFieldSet, compositeFieldSet); } @Override @@ -91,7 +91,7 @@ public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) th public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { delegate.addSortedNumericField(field, valuesProducer); // Perform this only during flush flow - if (mergeState.get() == null) { + if (mergeState.get() == null && segmentHasCompositeFields) { createCompositeIndicesIfPossible(valuesProducer, field); } } diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java index 7187fade882ea..3da70b9b3bce5 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/BaseStarTreeBuilder.java @@ -72,7 +72,7 @@ public abstract class BaseStarTreeBuilder implements StarTreeBuilder { protected final TreeNode rootNode = getNewNode(); - private final StarTreeField starTreeField; + protected final StarTreeField starTreeField; private final MapperService mapperService; private final SegmentWriteState state; static String NUM_SEGMENT_DOCS = "numSegmentDocs"; @@ -163,7 +163,7 @@ public List generateMetricAggregatorInfos(MapperService ma * * @return Star tree documents */ - public abstract List getStarTreeDocuments(); + public abstract List getStarTreeDocuments() throws IOException; /** * Returns the value of the dimension for the given dimension id and document in the star-tree. diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java new file mode 100644 index 0000000000000..66d975b2bcf24 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilder.java @@ -0,0 +1,937 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.builder; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RandomAccessInput; +import org.apache.lucene.store.TrackingDirectoryWrapper; +import org.apache.lucene.util.IntroSorter; +import org.apache.lucene.util.NumericUtils; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.util.io.IOUtils; +import org.opensearch.index.codec.composite.datacube.startree.StarTreeValues; +import org.opensearch.index.compositeindex.datacube.Dimension; +import org.opensearch.index.compositeindex.datacube.Metric; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericTypeConverters; +import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator; +import org.opensearch.index.mapper.MapperService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Off-heap implementation of the star tree builder. + * + *

+ * Segment documents are stored in a single file named 'segment.documents' for sorting and aggregation. A document ID array is created, + * and the document IDs in the array are swapped during sorting based on the actual segment document values in the file. + *

+ * Star tree documents are stored in multiple 'star-tree.documents' files. The algorithm works as follows: + *

    + *
  1. Initially, aggregated documents are created based on the segment documents.
  2. + *
  3. Further, star tree documents are generated (e.g., in the {@code generateStarTreeDocumentsForStarNode} method) by reading the current + * aggregated documents and creating new aggregated star tree documents, which are appended to the 'star-tree.documents' files.
  4. + *
  5. This process is repeated until all combinations of star tree documents are generated.
  6. + *
+ *

In cases where previously written star tree documents need to be read from the 'star-tree.documents' files, the current + * 'star-tree.documents' file is closed, and the values are read. Then, the derived values gets appended to a new 'star-tree.documents' file. + * This is necessary because Lucene maintains immutability of data, and an {@code IndexOutput} cannot be kept open while creating an + * {@code IndexInput} on the same file, as all file contents may not be visible in the reader. Therefore, the {@code IndexOutput} must be + * closed to ensure all data can be read before creating an {@code IndexInput}. Additionally, an {@code IndexOutput} cannot be reopened, + * so a new file is created for the new star tree documents. + *

The set of 'star-tree.documents' files is maintained, and a tracker array is used to keep track of the start document ID for each file. + * Once the number of files reaches a set threshold, the files are merged. + + @opensearch.experimental + **/ + +@ExperimentalApi +public class OffHeapStarTreeBuilder extends BaseStarTreeBuilder { + private static final Logger logger = LogManager.getLogger(OffHeapStarTreeBuilder.class); + private static final String SEGMENT_DOC_FILE_NAME = "segment.documents"; + private static final String STAR_TREE_DOC_FILE_NAME = "star-tree.documents"; + // TODO : Should this be via settings ? + private static final int DEFAULT_FILE_COUNT_MERGE_THRESHOLD = 5; + private final int fileCountMergeThreshold; + private final List starTreeDocumentOffsets; + private int numReadableStarTreeDocuments; + final IndexOutput segmentDocsFileOutput; + IndexOutput starTreeDocsFileOutput; + IndexInput starTreeDocsFileInput; + IndexInput segmentDocsFileInput; + RandomAccessInput segmentRandomInput; + private RandomAccessInput starTreeDocsFileRandomInput; + SegmentWriteState state; + Map fileToByteSizeMap; + int starTreeFileCount = -1; + int prevStartDocId = Integer.MAX_VALUE; + int currBytes = 0; + int docSizeInBytes = -1; + TrackingDirectoryWrapper tmpDirectory; + + /** + * Builds star tree based on star tree field configuration consisting of dimensions, metrics and star tree index + * specific configuration. + * + * @param starTreeField holds the configuration for the star tree + * @param state stores the segment write state + * @param mapperService helps to find the original type of the field + */ + protected OffHeapStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService) throws IOException { + this(starTreeField, state, mapperService, DEFAULT_FILE_COUNT_MERGE_THRESHOLD); + } + + /** + * Builds star tree based on star tree field configuration consisting of dimensions, metrics and star tree index + * specific configuration. + * + * @param starTreeField holds the configuration for the star tree + * @param state stores the segment write state + * @param mapperService helps to find the original type of the field + * @param fileThreshold threshold for number of files after which we merge the files + */ + protected OffHeapStarTreeBuilder(StarTreeField starTreeField, SegmentWriteState state, MapperService mapperService, int fileThreshold) + throws IOException { + super(starTreeField, state, mapperService); + this.fileCountMergeThreshold = fileThreshold; + this.state = state; + this.tmpDirectory = new TrackingDirectoryWrapper(state.directory); + fileToByteSizeMap = new LinkedHashMap<>(); // maintain order + starTreeFileCount++; + try { + starTreeDocsFileOutput = tmpDirectory.createTempOutput( + STAR_TREE_DOC_FILE_NAME + starTreeFileCount, + state.segmentSuffix, + state.context + ); + segmentDocsFileOutput = tmpDirectory.createTempOutput(SEGMENT_DOC_FILE_NAME, state.segmentSuffix, state.context); + } catch (IOException e) { + IOUtils.closeWhileHandlingException(starTreeDocsFileOutput); + IOUtils.close(this); + throw e; + } + starTreeDocumentOffsets = new ArrayList<>(); + } + + /** + * Creates a new star tree document temporary file to store star tree documents. + */ + IndexOutput createStarTreeDocumentsFileOutput() throws IOException { + starTreeFileCount++; + return tmpDirectory.createTempOutput(STAR_TREE_DOC_FILE_NAME + starTreeFileCount, state.segmentSuffix, state.context); + } + + @Override + public void appendStarTreeDocument(StarTreeDocument starTreeDocument) throws IOException { + int bytes = writeStarTreeDocument(starTreeDocument, starTreeDocsFileOutput); + if (docSizeInBytes == -1) { + docSizeInBytes = bytes; + } + assert docSizeInBytes == bytes; + starTreeDocumentOffsets.add(currBytes); + currBytes += bytes; + } + + /** + * Builds star tree based on the star tree values from multiple segments + * + * @param starTreeValuesSubs contains the star tree values from multiple segments + */ + @Override + public void build(List starTreeValuesSubs) throws IOException { + try { + build(mergeStarTrees(starTreeValuesSubs)); + } finally { + try { + for (String file : tmpDirectory.getCreatedFiles()) { + tmpDirectory.deleteFile(file); + } + } catch (final IOException ignored) {} + } + } + + /** + * Sorts and aggregates the star-tree documents from multiple segments and builds star tree based on the newly + * aggregated star-tree documents + * + * @param starTreeValuesSubs StarTreeValues from multiple segments + * @return iterator of star tree documents + */ + Iterator mergeStarTrees(List starTreeValuesSubs) throws IOException { + int docBytesLength = 0; + int numDocs = 0; + int[] sortedDocIds; + try { + for (StarTreeValues starTreeValues : starTreeValuesSubs) { + List dimensionsSplitOrder = starTreeValues.getStarTreeField().getDimensionsOrder(); + SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[starTreeValues.getStarTreeField() + .getDimensionsOrder() + .size()]; + for (int i = 0; i < dimensionsSplitOrder.size(); i++) { + String dimension = dimensionsSplitOrder.get(i).getField(); + dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocValuesIteratorMap().get(dimension)); + } + List metricReaders = new ArrayList<>(); + for (Map.Entry metricDocValuesEntry : starTreeValues.getMetricDocValuesIteratorMap().entrySet()) { + metricReaders.add(new SequentialDocValuesIterator(metricDocValuesEntry.getValue())); + } + int currentDocId = 0; + int numSegmentDocs = Integer.parseInt( + starTreeValues.getAttributes().getOrDefault(NUM_SEGMENT_DOCS, String.valueOf(DocIdSetIterator.NO_MORE_DOCS)) + ); + while (currentDocId < numSegmentDocs) { + Long[] dims = new Long[starTreeValues.getStarTreeField().getDimensionsOrder().size()]; + int i = 0; + for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) { + dimensionDocValueIterator.nextDoc(currentDocId); + Long val = dimensionDocValueIterator.value(currentDocId); + dims[i] = val; + i++; + } + i = 0; + Object[] metrics = new Object[metricReaders.size()]; + for (SequentialDocValuesIterator metricDocValuesIterator : metricReaders) { + metricDocValuesIterator.nextDoc(currentDocId); + // As part of merge, we traverse the star tree doc values + // The type of data stored in metric fields is different from the + // actual indexing field they're based on + metrics[i] = metricAggregatorInfos.get(i) + .getValueAggregators() + .toStarTreeNumericTypeValue(metricDocValuesIterator.value(currentDocId)); + i++; + } + StarTreeDocument starTreeDocument = new StarTreeDocument(dims, metrics); + int bytes = writeStarTreeDocument(starTreeDocument, segmentDocsFileOutput); + numDocs++; + docBytesLength = bytes; + currentDocId++; + } + } + sortedDocIds = new int[numDocs]; + for (int i = 0; i < numDocs; i++) { + sortedDocIds[i] = i; + } + } finally { + segmentDocsFileOutput.close(); + } + + if (numDocs == 0) { + return Collections.emptyIterator(); + } + + return sortAndReduceDocuments(sortedDocIds, numDocs, docBytesLength, true); + } + + /** + * Sorts and reduces the star tree documents based on the dimensions during flush flow + */ + private Iterator sortAndReduceDocuments(int[] sortedDocIds, int numDocs, int docBytesLength) throws IOException { + return sortAndReduceDocuments(sortedDocIds, numDocs, docBytesLength, false); + } + + /** + * Sorts and reduces the star tree documents based on the dimensions + */ + private Iterator sortAndReduceDocuments(int[] sortedDocIds, int numDocs, int docBytesLength, boolean isMerge) + throws IOException { + try { + segmentDocsFileInput = tmpDirectory.openInput(segmentDocsFileOutput.getName(), state.context); + final long documentBytes = docBytesLength; + segmentRandomInput = segmentDocsFileInput.randomAccessSlice(0, segmentDocsFileInput.length()); + if (sortedDocIds == null || sortedDocIds.length == 0) { + logger.debug("Sorted doc ids array is null"); + return Collections.emptyIterator(); + } + new IntroSorter() { + private long[] dimensions; + + @Override + protected void swap(int i, int j) { + int temp = sortedDocIds[i]; + sortedDocIds[i] = sortedDocIds[j]; + sortedDocIds[j] = temp; + } + + @Override + protected void setPivot(int i) { + long offset = (long) sortedDocIds[i] * documentBytes; + dimensions = new long[starTreeField.getDimensionsOrder().size()]; + try { + for (int j = 0; j < dimensions.length; j++) { + dimensions[j] = segmentRandomInput.readLong(offset + (long) j * Long.BYTES); + } + } catch (IOException e) { + throw new RuntimeException("Sort documents failed ", e); + } + } + + @Override + protected int comparePivot(int j) { + long offset = (long) sortedDocIds[j] * documentBytes; + try { + for (int i = 0; i < dimensions.length; i++) { + long dimension = segmentRandomInput.readLong(offset + (long) i * Long.BYTES); + if (dimensions[i] != dimension) { + return Long.compare(dimensions[i], dimension); + } + } + } catch (IOException e) { + throw new RuntimeException("Sort documents failed ", e); + } + return 0; + } + }.sort(0, numDocs); + + // Create an iterator for aggregated documents + IndexInput finalSegmentDocsFileInput = segmentDocsFileInput; + return new Iterator() { + boolean _hasNext = true; + StarTreeDocument currentDocument; + + { + currentDocument = getSegmentStarTreeDocument(sortedDocIds[0], documentBytes, isMerge); + } + + int _docId = 1; + + @Override + public boolean hasNext() { + return _hasNext; + } + + @Override + public StarTreeDocument next() { + StarTreeDocument next = reduceSegmentStarTreeDocuments(null, currentDocument, isMerge); + while (_docId < numDocs) { + StarTreeDocument doc; + try { + doc = getSegmentStarTreeDocument(sortedDocIds[_docId++], documentBytes, isMerge); + } catch (IOException e) { + throw new RuntimeException("Reducing documents failed ", e); + } + if (!Arrays.equals(doc.dimensions, next.dimensions)) { + currentDocument = doc; + return next; + } else { + next = reduceSegmentStarTreeDocuments(next, doc, isMerge); + } + } + _hasNext = false; + IOUtils.closeWhileHandlingException(finalSegmentDocsFileInput); + try { + tmpDirectory.deleteFile(segmentDocsFileOutput.getName()); + } catch (final IOException ignored) {} + return next; + } + }; + } catch (IOException ex) { + IOUtils.closeWhileHandlingException(segmentDocsFileInput); + throw ex; + } + } + + /** + * Get segment star tree document from the segment.documents file + */ + public StarTreeDocument getSegmentStarTreeDocument(int docID, long documentBytes, boolean isMerge) throws IOException { + return readSegmentStarTreeDocument(segmentRandomInput, docID * documentBytes, isMerge); + } + + /** + * Get star tree document for the given docId from the star-tree documents file + */ + @Override + public StarTreeDocument getStarTreeDocument(int docId) throws IOException { + ensureDocumentReadable(docId); + return readStarTreeDocument(starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId)); + } + + // This should be only used for testing + @Override + public List getStarTreeDocuments() throws IOException { + List starTreeDocuments = new ArrayList<>(); + for (int i = 0; i < numStarTreeDocs; i++) { + starTreeDocuments.add(getStarTreeDocument(i)); + } + return starTreeDocuments; + } + + @Override + public Long getDimensionValue(int docId, int dimensionId) throws IOException { + ensureDocumentReadable(docId); + return starTreeDocsFileRandomInput.readLong((starTreeDocumentOffsets.get(docId) + ((long) dimensionId * Long.BYTES))); + } + + /** + * Sorts and aggregates all the documents of the segment based on dimension and metrics configuration + * + * @param dimensionReaders List of docValues readers to read dimensions from the segment + * @param metricReaders List of docValues readers to read metrics from the segment + * @return Iterator of star-tree documents + */ + @Override + public Iterator sortAndAggregateSegmentDocuments( + SequentialDocValuesIterator[] dimensionReaders, + List metricReaders + ) throws IOException { + // Write all dimensions for segment documents into the buffer, and sort all documents using an int + // array + int documentBytesLength = 0; + int[] sortedDocIds = new int[totalSegmentDocs]; + for (int i = 0; i < totalSegmentDocs; i++) { + sortedDocIds[i] = i; + } + + try { + for (int i = 0; i < totalSegmentDocs; i++) { + StarTreeDocument document = getSegmentStarTreeDocument(i, dimensionReaders, metricReaders); + documentBytesLength = writeSegmentStarTreeDocument(document, segmentDocsFileOutput); + } + } finally { + segmentDocsFileOutput.close(); + } + + // Create an iterator for aggregated documents + return sortAndReduceDocuments(sortedDocIds, totalSegmentDocs, documentBytesLength); + } + + /** + * Generates a star-tree for a given star-node + * + * @param startDocId Start document id in the star-tree + * @param endDocId End document id (exclusive) in the star-tree + * @param dimensionId Dimension id of the star-node + * @return iterator for star-tree documents of star-node + * @throws IOException throws when unable to generate star-tree for star-node + */ + @Override + public Iterator generateStarTreeDocumentsForStarNode(int startDocId, int endDocId, int dimensionId) + throws IOException { + // End doc id is not inclusive but start doc is inclusive + // Hence we need to check if buffer is readable till endDocId - 1 + ensureDocumentReadable(endDocId - 1); + + // Sort all documents using an int array + int numDocs = endDocId - startDocId; + int[] sortedDocIds = new int[numDocs]; + for (int i = 0; i < numDocs; i++) { + sortedDocIds[i] = startDocId + i; + } + new IntroSorter() { + private long[] dimensions; + + @Override + protected void swap(int i, int j) { + int temp = sortedDocIds[i]; + sortedDocIds[i] = sortedDocIds[j]; + sortedDocIds[j] = temp; + } + + @Override + protected void setPivot(int i) { + long offset = starTreeDocumentOffsets.get(sortedDocIds[i]); + dimensions = new long[starTreeField.getDimensionsOrder().size()]; + try { + for (int j = dimensionId + 1; j < dimensions.length; j++) { + dimensions[j] = starTreeDocsFileRandomInput.readLong(offset + (long) j * Long.BYTES); + } + } catch (IOException e) { + throw new RuntimeException("Sort documents failed ", e); + } + } + + @Override + protected int comparePivot(int j) { + long offset = starTreeDocumentOffsets.get(sortedDocIds[j]); + try { + for (int i = dimensionId + 1; i < dimensions.length; i++) { + long dimension = starTreeDocsFileRandomInput.readLong(offset + (long) i * Long.BYTES); + if (dimensions[i] != dimension) { + return Long.compare(dimensions[i], dimension); + } + } + } catch (IOException e) { + throw new RuntimeException("Sort documents failed ", e); + } + return 0; + } + }.sort(0, numDocs); + + // Create an iterator for aggregated documents + return new Iterator() { + boolean _hasNext = true; + StarTreeDocument _currentdocument = getStarTreeDocument(sortedDocIds[0]); + int _docId = 1; + + private boolean hasSameDimensions(StarTreeDocument document1, StarTreeDocument document2) { + for (int i = dimensionId + 1; i < starTreeField.getDimensionsOrder().size(); i++) { + if (!Objects.equals(document1.dimensions[i], document2.dimensions[i])) { + return false; + } + } + return true; + } + + @Override + public boolean hasNext() { + return _hasNext; + } + + @Override + public StarTreeDocument next() { + StarTreeDocument next = reduceStarTreeDocuments(null, _currentdocument); + next.dimensions[dimensionId] = STAR_IN_DOC_VALUES_INDEX; + while (_docId < numDocs) { + StarTreeDocument document; + try { + document = getStarTreeDocument(sortedDocIds[_docId++]); + } catch (IOException e) { + throw new RuntimeException(e); + } + if (!hasSameDimensions(document, _currentdocument)) { + _currentdocument = document; + return next; + } else { + next = reduceStarTreeDocuments(next, document); + } + } + _hasNext = false; + return next; + } + }; + } + + /** + * Write the star tree document to file from various fields of the segment associated with dimensions and metrics + */ + private int writeSegmentStarTreeDocument(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException { + int numBytes = writeDimensions(starTreeDocument, output); + // upto 32 metrics + int metricsNullBitSet = 0; + for (int i = 0; i < starTreeDocument.metrics.length; i++) { + switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) { + case LONG: + case DOUBLE: + if (starTreeDocument.metrics[i] == null) { + starTreeDocument.metrics[i] = 0L; + metricsNullBitSet |= (1 << i); + } + output.writeLong((Long) starTreeDocument.metrics[i]); + numBytes += Long.BYTES; + break; + case INT: + case FLOAT: + default: + throw new IllegalStateException(); + } + } + output.writeInt(metricsNullBitSet); + numBytes += Integer.BYTES; + return numBytes; + } + + private static int writeDimensions(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException { + int numBytes = 0; + short dimensionNullBitSet = 0; // Initialize a short to store dimension flags + + for (int i = 0; i < starTreeDocument.dimensions.length; i++) { + if (starTreeDocument.dimensions[i] == null) { + // Set the corresponding bit in dimensionNullBitSet to 1 (present) + dimensionNullBitSet |= (short) (1 << i); + starTreeDocument.dimensions[i] = 0L; + } + output.writeLong(starTreeDocument.dimensions[i]); + numBytes += Long.BYTES; + } + // Write the dimensionNullBitSet after writing all dimensions + output.writeShort(dimensionNullBitSet); + numBytes += Short.BYTES; + return numBytes; + } + + /** + * Write the aggregated star tree document to file + */ + private int writeStarTreeDocument(StarTreeDocument starTreeDocument, IndexOutput output) throws IOException { + int numBytes = writeDimensions(starTreeDocument, output); + // upto 32 metrics + int metricNullBitSet = 0; + for (int i = 0; i < starTreeDocument.metrics.length; i++) { + switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) { + case LONG: + if (starTreeDocument.metrics[i] == null) { + starTreeDocument.metrics[i] = 0L; + metricNullBitSet |= (1 << i); + } + output.writeLong((Long) starTreeDocument.metrics[i]); + numBytes += Long.BYTES; + break; + case DOUBLE: + if (starTreeDocument.metrics[i] == null) { + starTreeDocument.metrics[i] = 0.0; + metricNullBitSet |= (1 << i); + } + long val = NumericUtils.doubleToSortableLong((Double) starTreeDocument.metrics[i]); + output.writeLong(val); + numBytes += Long.BYTES; + break; + case INT: + case FLOAT: + default: + throw new IllegalStateException(); + } + } + output.writeInt(metricNullBitSet); + numBytes += Integer.BYTES; + return numBytes; + } + + /** + * Reads the segment star tree document from given offset + * + * @param input RandomAccessInput + * @param offset Offset in the file + * @return StarTreeDocument + * @throws IOException IOException in case of I/O errors + */ + private StarTreeDocument readSegmentStarTreeDocument(RandomAccessInput input, long offset, boolean isMerge) throws IOException { + int dimSize = starTreeField.getDimensionsOrder().size(); + Long[] dimensions = new Long[dimSize]; + for (int i = 0; i < dimSize; i++) { + try { + dimensions[i] = input.readLong(offset); + } catch (Exception e) { + logger.debug( + "Error reading dimension value at offset {} for dimension {} : _numReadableStarTreedocuments = {}", + offset, + i, + numReadableStarTreeDocuments + ); + throw e; + } + offset += Long.BYTES; + } + setNullValuesForDimensions(input, offset, dimensions); + offset += Short.BYTES; + int numMetrics = 0; + for (Metric metric : starTreeField.getMetrics()) { + numMetrics += metric.getMetrics().size(); + } + Object[] metrics = new Object[numMetrics]; + for (int i = 0; i < numMetrics; i++) { + switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) { + case LONG: + metrics[i] = input.readLong(offset); + offset += Long.BYTES; + break; + case DOUBLE: + long val = input.readLong(offset); + if (isMerge) { + metrics[i] = StarTreeNumericTypeConverters.sortableLongtoDouble(val); + } else { + metrics[i] = val; + } + offset += Long.BYTES; + break; + case FLOAT: + case INT: + default: + throw new IllegalStateException(); + } + } + setNullForMetrics(input, offset, numMetrics, metrics); + offset += Integer.BYTES; + return new StarTreeDocument(dimensions, metrics); + } + + /** + * Reads the star tree document from given offset + * + * @param input RandomAccessInput + * @param offset Offset in the file + * @return StarTreeDocument + * @throws IOException IOException in case of I/O errors + */ + private StarTreeDocument readStarTreeDocument(RandomAccessInput input, long offset) throws IOException { + int dimSize = starTreeField.getDimensionsOrder().size(); + Long[] dimensions = new Long[dimSize]; + for (int i = 0; i < dimSize; i++) { + try { + dimensions[i] = input.readLong(offset); + } catch (Exception e) { + logger.error( + "Error reading dimension value at offset {} for dimension {} : _numReadableStarTreedocuments = {}", + offset, + i, + numReadableStarTreeDocuments + ); + throw e; + } + offset += Long.BYTES; + } + setNullValuesForDimensions(input, offset, dimensions); + offset += Short.BYTES; + int numMetrics = 0; + for (Metric metric : starTreeField.getMetrics()) { + numMetrics += metric.getMetrics().size(); + } + Object[] metrics = new Object[numMetrics]; + for (int i = 0; i < numMetrics; i++) { + switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) { + case LONG: + metrics[i] = input.readLong(offset); + offset += Long.BYTES; + break; + case DOUBLE: + long val = input.readLong(offset); + offset += Long.BYTES; + metrics[i] = StarTreeNumericTypeConverters.sortableLongtoDouble(val); + break; + + case FLOAT: + case INT: + default: + throw new IllegalStateException(); + } + } + setNullForMetrics(input, offset, numMetrics, metrics); + offset += Integer.BYTES; + return new StarTreeDocument(dimensions, metrics); + } + + /** + * Sets null/identity equivalent for the metrics when applicable based on the bitset + */ + private void setNullForMetrics(RandomAccessInput input, long offset, int numMetrics, Object[] metrics) throws IOException { + int nullMetricsBitSet = input.readInt(offset); + for (int i = 0; i < numMetrics; i++) { + boolean isMetricNull; + switch (metricAggregatorInfos.get(i).getValueAggregators().getAggregatedValueType()) { + case LONG: + // TODO : get the identity value of metrics here + isMetricNull = (nullMetricsBitSet & (1 << i)) != 0; + if (isMetricNull) { + metrics[i] = 0L; + } + break; + case DOUBLE: + // TODO : get the identity value of metrics here + isMetricNull = (nullMetricsBitSet & (1 << i)) != 0; + if (isMetricNull) { + metrics[i] = 0; + } + break; + + case FLOAT: + case INT: + default: + throw new IllegalStateException(); + } + } + } + + /** + * Set null values for dimension based on the bitset + */ + private void setNullValuesForDimensions(RandomAccessInput input, long offset, Long[] dimensions) throws IOException { + short nullDimensionsBitSet = input.readShort(offset); + for (int i = 0; i < dimensions.length; i++) { + boolean isDimensionNull = (nullDimensionsBitSet & (1 << i)) != 0; + if (isDimensionNull) { + dimensions[i] = null; + } + } + } + + /** + * Load the correct StarTreeDocuments file based on the docId + */ + private void ensureDocumentReadable(int docId) throws IOException { + ensureDocumentReadable(docId, true); + } + + /** + * Load the correct StarTreeDocuments file based on the docId + * + * @param docId requested doc id + * @param shouldCreateFileOutput this flag is used to indicate whether to create a new file output which is not needed during file format write operation + */ + private void ensureDocumentReadable(int docId, boolean shouldCreateFileOutput) throws IOException { + if (docId >= prevStartDocId && docId < numReadableStarTreeDocuments) { + return; + } + IOUtils.closeWhileHandlingException(starTreeDocsFileInput); + starTreeDocsFileInput = null; + /* + * If docId is less then the _numDocs , then we need to find a previous file associated with doc id + * The fileToByteSizeMap is in the following format + * file1 -> 521 + * file2 -> 780 + * + * which represents that file1 contains all docs till "520". + * "prevStartDocId" essentially tracks the "start doc id" of the range in the present file + * "_numReadableStarTreedocuments" tracks the "end doc id + 1" of the range in the present file + * + * IMPORTANT : This is case where the requested file is not the file which is being currently written to + */ + try { + if (docId < numStarTreeDocs) { + int prevStartDocId = 0; + for (Map.Entry entry : fileToByteSizeMap.entrySet()) { + if (docId < entry.getValue()) { + starTreeDocsFileInput = tmpDirectory.openInput(entry.getKey(), state.context); + starTreeDocsFileRandomInput = starTreeDocsFileInput.randomAccessSlice( + starTreeDocsFileInput.getFilePointer(), + starTreeDocsFileInput.length() - starTreeDocsFileInput.getFilePointer() + ); + numReadableStarTreeDocuments = entry.getValue(); + break; + } + prevStartDocId = entry.getValue(); + } + this.prevStartDocId = prevStartDocId; + } + + if (starTreeDocsFileInput != null) { + return; + } + } catch (IOException ex) { + if (starTreeDocsFileOutput != null) { + IOUtils.closeWhileHandlingException(starTreeDocsFileOutput); + } + } + if (starTreeDocsFileOutput != null) { + IOUtils.closeWhileHandlingException(starTreeDocsFileOutput); + } + + currBytes = 0; + if (starTreeDocsFileOutput != null) { + fileToByteSizeMap.put(starTreeDocsFileOutput.getName(), numStarTreeDocs); + } + + if (shouldCreateFileOutput) { + starTreeDocsFileOutput = createStarTreeDocumentsFileOutput(); + } + + // Check if we need to merge files + if (fileToByteSizeMap.size() >= fileCountMergeThreshold) { + mergeFiles(); + } + + if (starTreeDocsFileRandomInput != null) { + starTreeDocsFileRandomInput = null; + } + try { + int prevStartDocId = 0; + for (Map.Entry entry : fileToByteSizeMap.entrySet()) { + if (docId <= entry.getValue() - 1) { + starTreeDocsFileInput = tmpDirectory.openInput(entry.getKey(), state.context); + starTreeDocsFileRandomInput = starTreeDocsFileInput.randomAccessSlice( + starTreeDocsFileInput.getFilePointer(), + starTreeDocsFileInput.length() - starTreeDocsFileInput.getFilePointer() + ); + numReadableStarTreeDocuments = entry.getValue(); + break; + } + prevStartDocId = entry.getValue(); + } + this.prevStartDocId = prevStartDocId; + } catch (IOException e) { + IOUtils.close(this); + throw e; + } + + } + + /** + * Merge temporary star tree files once the number of files reach threshold + */ + private void mergeFiles() throws IOException { + try (IndexOutput mergedOutput = createStarTreeDocumentsFileOutput()) { + long st = System.currentTimeMillis(); + + long mergeBytes = 0L; + for (Map.Entry entry : fileToByteSizeMap.entrySet()) { + IndexInput input = tmpDirectory.openInput(entry.getKey(), state.context); + mergedOutput.copyBytes(input, input.length()); + mergeBytes += input.length(); + input.close(); + } + logger.debug( + "Created merge file : {} in : {} ms with size of : {} KB", + starTreeDocsFileOutput.getName(), + System.currentTimeMillis() - st, + mergeBytes / 1024 + ); + // Delete the old files + for (String fileName : fileToByteSizeMap.keySet()) { + tmpDirectory.deleteFile(fileName); + } + // Clear the fileToByteSizeMap and add the merged file + fileToByteSizeMap.clear(); + fileToByteSizeMap.put(mergedOutput.getName(), numStarTreeDocs); + } + + int curr = 0; + for (int i = 0; i < starTreeDocumentOffsets.size(); i++) { + starTreeDocumentOffsets.set(i, curr); + curr += docSizeInBytes; + } + + } + + /** + * Close the open segment files, star tree document files and associated data in/outputs. + * Delete all the temporary segment files and star tree document files + * + * @throws IOException IOException in case of I/O errors + */ + @Override + public void close() throws IOException { + try { + if (starTreeDocsFileOutput != null) { + IOUtils.closeWhileHandlingException(starTreeDocsFileOutput); + try { + tmpDirectory.deleteFile(starTreeDocsFileOutput.getName()); + } catch (IOException ignored) {} + } + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + IOUtils.closeWhileHandlingException(starTreeDocsFileInput, segmentDocsFileInput, starTreeDocsFileOutput, segmentDocsFileOutput); + } + try { + if (this.segmentDocsFileOutput != null) { + // Delete all temporary segment document files + tmpDirectory.deleteFile(segmentDocsFileOutput.getName()); + } + } catch (IOException ignored) {} + // Delete all temporary star tree document files + for (String file : fileToByteSizeMap.keySet()) { + try { + tmpDirectory.deleteFile(file); + } catch (IOException ignored) {} + } + super.close(); + } +} diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java index 1599be2e76a56..2c6230774a0d9 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/OnHeapStarTreeBuilder.java @@ -127,7 +127,6 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List starTreeVal metricReaders.add(new SequentialDocValuesIterator(metricDocValuesEntry.getValue())); } - boolean endOfDoc = false; int currentDocId = 0; int numSegmentDocs = Integer.parseInt( starTreeValues.getAttributes().getOrDefault(NUM_SEGMENT_DOCS, String.valueOf(DocIdSetIterator.NO_MORE_DOCS)) diff --git a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java index 6c3d476aa3a55..a744443417105 100644 --- a/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java +++ b/server/src/main/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilder.java @@ -102,9 +102,9 @@ public void buildDuringMerge(final Map> starTreeVal continue; } StarTreeField starTreeField = starTreeValuesList.get(0).getStarTreeField(); - StarTreeBuilder builder = getSingleTreeBuilder(starTreeField, state, mapperService); - builder.build(starTreeValuesList); - builder.close(); + try (StarTreeBuilder builder = getSingleTreeBuilder(starTreeField, state, mapperService)) { + builder.build(starTreeValuesList); + } } logger.debug( "Took {} ms to merge {} star-trees with star-tree fields", @@ -122,8 +122,7 @@ StarTreeBuilder getSingleTreeBuilder(StarTreeField starTreeField, SegmentWriteSt case ON_HEAP: return new OnHeapStarTreeBuilder(starTreeField, state, mapperService); case OFF_HEAP: - // TODO - // return new OffHeapStarTreeBuilder(starTreeField, state, mapperService); + return new OffHeapStarTreeBuilder(starTreeField, state, mapperService); default: throw new IllegalArgumentException( String.format( diff --git a/server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java b/server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java index d2debe762e9be..d9539f9dc0c82 100644 --- a/server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/StarTreeMapper.java @@ -84,8 +84,7 @@ public static class Builder extends ParametrizedFieldMapper.Builder { List.of(XContentMapValues.nodeStringArrayValue(paramMap.getOrDefault(SKIP_STAR_NODE_IN_DIMS, new ArrayList()))) ); paramMap.remove(SKIP_STAR_NODE_IN_DIMS); - // TODO : change this to off heap once off heap gets implemented - StarTreeFieldConfiguration.StarTreeBuildMode buildMode = StarTreeFieldConfiguration.StarTreeBuildMode.ON_HEAP; + StarTreeFieldConfiguration.StarTreeBuildMode buildMode = StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP; List dimensions = buildDimensions(name, paramMap, context); paramMap.remove(ORDERED_DIMENSIONS); diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java index 76a7875919a8b..bde688fc6afea 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/AbstractStarTreeBuilderTests.java @@ -411,7 +411,9 @@ public void test_sortAndAggregateStarTreeDocuments_nullDimensionsAndNullMetrics( starTreeDocuments[3] = new StarTreeDocument(new Long[] { null, null, null, null }, new Double[] { null, null, null }); starTreeDocuments[4] = new StarTreeDocument(new Long[] { null, null, null, null }, new Double[] { null, null, null }); - List inorderStarTreeDocuments = List.of(); + List inorderStarTreeDocuments = List.of( + new StarTreeDocument(new Long[] { null, null, null, null }, new Object[] { 0.0, 0.0, 5L }) + ); Iterator expectedStarTreeDocumentIterator = inorderStarTreeDocuments.iterator(); StarTreeDocument[] segmentStarTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments]; @@ -1055,7 +1057,7 @@ public void testFlushFlow() throws IOException { SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField); SortedNumericDocValues m2sndv = getSortedNumericMock(metricsList, metricsWithField); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(6), mapperService); + builder = getStarTreeBuilder(sf, getWriteState(6), mapperService); SequentialDocValuesIterator[] dimDvs = { new SequentialDocValuesIterator(d1sndv), new SequentialDocValuesIterator(d2sndv) }; Iterator starTreeDocumentIterator = builder.sortAndAggregateSegmentDocuments( dimDvs, @@ -1120,7 +1122,7 @@ public void testFlushFlowBuild() throws IOException { SortedNumericDocValues d2sndv = getSortedNumericMock(dimList2, docsWithField2); SortedNumericDocValues m1sndv = getSortedNumericMock(metricsList, metricsWithField); - BaseStarTreeBuilder builder = getStarTreeBuilder(sf, getWriteState(100), mapperService); + builder = getStarTreeBuilder(sf, getWriteState(100), mapperService); DocValuesProducer d1vp = getDocValuesProducer(d1sndv); DocValuesProducer d2vp = getDocValuesProducer(d2sndv); @@ -1209,7 +1211,7 @@ public void testMergeFlowWithSum() throws IOException { sf, "6" ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(6), mapperService); + builder = getStarTreeBuilder(sf, getWriteState(6), mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Sum [ metric] ] @@ -1259,7 +1261,7 @@ public void testMergeFlowWithCount() throws IOException { sf, "6" ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(6), mapperService); + builder = getStarTreeBuilder(sf, getWriteState(6), mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] @@ -1336,7 +1338,7 @@ public void testMergeFlowWithDifferentDocsFromSegments() throws IOException { sf, "4" ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(4), mapperService); + builder = getStarTreeBuilder(sf, getWriteState(4), mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] @@ -1363,6 +1365,66 @@ public void testMergeFlowWithDifferentDocsFromSegments() throws IOException { assertEquals(9, count); } + public void testMergeFlowNumSegmentsDocs() throws IOException { + List dimList = List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, -1L, -1L, -1L); + List docsWithField = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + List dimList2 = List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, -1L, -1L, -1L); + List docsWithField2 = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + List metricsList = List.of(0L, 1L, 2L, 3L, 4L, 5L, 6L, -1L, -1L, -1L); + List metricsWithField = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + + List dimList3 = List.of(5L, 6L, 7L, 8L, -1L); + List docsWithField3 = List.of(0, 1, 2, 3, 4); + List dimList4 = List.of(5L, 6L, 7L, 8L, -1L); + List docsWithField4 = List.of(0, 1, 2, 3, 4); + + List metricsList2 = List.of(5L, 6L, 7L, 8L, 9L); + List metricsWithField2 = List.of(0, 1, 2, 3, 4); + + StarTreeField sf = getStarTreeField(MetricStat.COUNT); + StarTreeValues starTreeValues = getStarTreeValues( + getSortedNumericMock(dimList, docsWithField), + getSortedNumericMock(dimList2, docsWithField2), + getSortedNumericMock(metricsList, metricsWithField), + sf, + "6" + ); + + StarTreeValues starTreeValues2 = getStarTreeValues( + getSortedNumericMock(dimList3, docsWithField3), + getSortedNumericMock(dimList4, docsWithField4), + getSortedNumericMock(metricsList2, metricsWithField2), + sf, + "4" + ); + builder = getStarTreeBuilder(sf, getWriteState(4), mapperService); + Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); + /** + * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] + [0, 0] | [0] + [1, 1] | [1] + [2, 2] | [2] + [3, 3] | [3] + [4, 4] | [4] + [5, 5] | [10] + [6, 6] | [6] + [7, 7] | [7] + [8, 8] | [8] + */ + int count = 0; + while (starTreeDocumentIterator.hasNext()) { + count++; + StarTreeDocument starTreeDocument = starTreeDocumentIterator.next(); + if (Objects.equals(starTreeDocument.dimensions[0], 5L)) { + assertEquals(starTreeDocument.dimensions[0] * 2, starTreeDocument.metrics[0]); + } else { + assertEquals(starTreeDocument.dimensions[1], starTreeDocument.metrics[0]); + } + } + assertEquals(9, count); + } + public void testMergeFlowWithMissingDocs() throws IOException { List dimList = List.of(0L, 1L, 2L, 3L, 4L, 6L); List docsWithField = List.of(0, 1, 2, 3, 4, 6); @@ -1396,7 +1458,7 @@ public void testMergeFlowWithMissingDocs() throws IOException { sf, "4" ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(4), mapperService); + builder = getStarTreeBuilder(sf, getWriteState(4), mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] @@ -1456,7 +1518,7 @@ public void testMergeFlowWithMissingDocsInSecondDim() throws IOException { sf, "4" ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(4), mapperService); + builder = getStarTreeBuilder(sf, getWriteState(4), mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] @@ -1517,7 +1579,7 @@ public void testMergeFlowWithDocsMissingAtTheEnd() throws IOException { sf, "4" ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); + builder = getStarTreeBuilder(sf, writeState, mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] @@ -1569,7 +1631,7 @@ public void testMergeFlowWithEmptyFieldsInOneSegment() throws IOException { sf, "0" ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, getWriteState(0), mapperService); + builder = getStarTreeBuilder(sf, getWriteState(0), mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** * Asserting following dim / metrics [ dim1, dim2 / Count [ metric] ] @@ -1664,8 +1726,8 @@ public void testMergeFlowWithDuplicateDimensionValues() throws IOException { metricsWithField, sf ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); - builder.build(List.of(starTreeValues, starTreeValues2)); + builder = getStarTreeBuilder(sf, writeState, mapperService); + builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2))); List starTreeDocuments = builder.getStarTreeDocuments(); assertEquals(401, starTreeDocuments.size()); int count = 0; @@ -1774,8 +1836,8 @@ public void testMergeFlowWithMaxLeafDocs() throws IOException { sf ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); - builder.build(List.of(starTreeValues, starTreeValues2)); + builder = getStarTreeBuilder(sf, writeState, mapperService); + builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2))); List starTreeDocuments = builder.getStarTreeDocuments(); /** 635 docs get generated @@ -1892,8 +1954,8 @@ public void testMergeFlowWithDuplicateDimensionValueWithMaxLeafDocs() throws IOE metricsWithField, sf ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); - builder.build(List.of(starTreeValues, starTreeValues2)); + builder = getStarTreeBuilder(sf, writeState, mapperService); + builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2))); List starTreeDocuments = builder.getStarTreeDocuments(); assertEquals(401, starTreeDocuments.size()); builder.close(); @@ -1991,8 +2053,8 @@ public void testMergeFlowWithMaxLeafDocsAndStarTreeNodesAssertion() throws IOExc metricsWithField, sf ); - OnHeapStarTreeBuilder builder = new OnHeapStarTreeBuilder(sf, writeState, mapperService); - builder.build(List.of(starTreeValues, starTreeValues2)); + builder = getStarTreeBuilder(sf, writeState, mapperService); + builder.build(builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2))); List starTreeDocuments = builder.getStarTreeDocuments(); Map> dimValueToDocIdMap = new HashMap<>(); traverseStarTree(builder.rootNode, dimValueToDocIdMap, true); @@ -2151,7 +2213,7 @@ public void testMergeFlow() throws IOException { getAttributes(1000) ); - BaseStarTreeBuilder builder = getStarTreeBuilder(sf, writeState, mapperService); + builder = getStarTreeBuilder(sf, writeState, mapperService); Iterator starTreeDocumentIterator = builder.mergeStarTrees(List.of(starTreeValues, starTreeValues2)); /** [0, 0, 0, 0] | [0.0] diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilderTests.java new file mode 100644 index 0000000000000..760ad6a98114b --- /dev/null +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/OffHeapStarTreeBuilderTests.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.compositeindex.datacube.startree.builder; + +import org.apache.lucene.index.SegmentWriteState; +import org.opensearch.index.compositeindex.datacube.startree.StarTreeField; +import org.opensearch.index.mapper.MapperService; + +import java.io.IOException; + +public class OffHeapStarTreeBuilderTests extends AbstractStarTreeBuilderTests { + @Override + public BaseStarTreeBuilder getStarTreeBuilder( + StarTreeField starTreeField, + SegmentWriteState segmentWriteState, + MapperService mapperService + ) throws IOException { + return new OffHeapStarTreeBuilder(starTreeField, segmentWriteState, mapperService, randomIntBetween(2, 6)); + } + +} diff --git a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java index 564ab110fa7a5..9e275e3898222 100644 --- a/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java +++ b/server/src/test/java/org/opensearch/index/compositeindex/datacube/startree/builder/StarTreesBuilderTests.java @@ -101,14 +101,6 @@ public void test_getStarTreeBuilder() throws IOException { assertTrue(starTreeBuilder instanceof OnHeapStarTreeBuilder); } - public void test_getStarTreeBuilder_illegalArgument() { - when(mapperService.getCompositeFieldTypes()).thenReturn(Set.of(starTreeFieldType)); - StarTreeFieldConfiguration starTreeFieldConfiguration = new StarTreeFieldConfiguration(1, new HashSet<>(), StarTreeFieldConfiguration.StarTreeBuildMode.OFF_HEAP); - StarTreeField starTreeField = new StarTreeField("star_tree", new ArrayList<>(), new ArrayList<>(), starTreeFieldConfiguration); - StarTreesBuilder starTreesBuilder = new StarTreesBuilder(segmentWriteState, mapperService); - assertThrows(IllegalArgumentException.class, () -> starTreesBuilder.getSingleTreeBuilder(starTreeField, segmentWriteState, mapperService)); - } - public void test_closeWithNoStarTreeFields() throws IOException { StarTreeFieldConfiguration starTreeFieldConfiguration = new StarTreeFieldConfiguration( 1,