Skip to content

Commit

Permalink
star tree file formats refactoring and fixing offset bug (opensearch-…
Browse files Browse the repository at this point in the history
…project#15975)

Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored Sep 19, 2024
1 parent 3a1b6d1 commit 80ff07e
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.index.compositeindex.datacube.startree.node.InMemoryTreeNode;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.index.mapper.FieldMapper;
import org.opensearch.index.mapper.FieldValueConverter;
Expand Down Expand Up @@ -193,7 +194,9 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
new SortedNumericStarTreeValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
)
);
}
metricReaders.add(metricReader);
Expand Down Expand Up @@ -228,7 +231,7 @@ public void build(
dimensionFieldInfo = getFieldInfo(dimension, DocValuesType.SORTED_NUMERIC);
}
dimensionReaders[i] = new SequentialDocValuesIterator(
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo))
);
}
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders);
Expand Down Expand Up @@ -287,7 +290,7 @@ void appendDocumentsToStarTree(Iterator<StarTreeDocument> starTreeDocumentIterat
}
}

private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDocs) throws IOException {
private void serializeStarTree(int numSegmentStarTreeDocuments, int numStarTreeDocs) throws IOException {
// serialize the star tree data
long dataFilePointer = dataOut.getFilePointer();
StarTreeWriter starTreeWriter = new StarTreeWriter();
Expand All @@ -299,7 +302,7 @@ private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDo
starTreeField,
metricAggregatorInfos,
numStarTreeNodes,
numSegmentStarTreeDocument,
numSegmentStarTreeDocuments,
numStarTreeDocs,
dataFilePointer,
totalStarTreeDataLength
Expand Down Expand Up @@ -400,22 +403,20 @@ protected StarTreeDocument getStarTreeDocument(
) throws IOException {
Long[] dims = new Long[numDimensions];
int i = 0;
for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) {
dimensionDocValueIterator.nextDoc(currentDocId);
Long val = dimensionDocValueIterator.value(currentDocId);
for (SequentialDocValuesIterator dimensionValueIterator : dimensionReaders) {
dimensionValueIterator.nextEntry(currentDocId);
Long val = dimensionValueIterator.value(currentDocId);
dims[i] = val;
i++;
}
i = 0;
Object[] metrics = new Object[metricReaders.size()];
for (SequentialDocValuesIterator metricDocValuesIterator : metricReaders) {
metricDocValuesIterator.nextDoc(currentDocId);
for (SequentialDocValuesIterator metricValuesIterator : metricReaders) {
metricValuesIterator.nextEntry(currentDocId);
// As part of merge, we traverse the star tree doc values
// The type of data stored in metric fields is different from the
// actual indexing field they're based on
metrics[i] = metricAggregatorInfos.get(i)
.getValueAggregators()
.toAggregatedValueType(metricDocValuesIterator.value(currentDocId));
metrics[i] = metricAggregatorInfos.get(i).getValueAggregators().toAggregatedValueType(metricValuesIterator.value(currentDocId));
i++;
}
return new StarTreeDocument(dims, metrics);
Expand Down Expand Up @@ -502,7 +503,7 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIte
for (int i = 0; i < numDimensions; i++) {
if (dimensionReaders[i] != null) {
try {
dimensionReaders[i].nextDoc(currentDocId);
dimensionReaders[i].nextEntry(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
Expand Down Expand Up @@ -530,7 +531,7 @@ private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<Sequential
SequentialDocValuesIterator metricStatReader = metricsReaders.get(i);
if (metricStatReader != null) {
try {
metricStatReader.nextDoc(currentDocId);
metricStatReader.nextEntry(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
Expand Down Expand Up @@ -672,7 +673,7 @@ private SequentialDocValuesIterator getIteratorForNumericField(
SequentialDocValuesIterator sequentialDocValuesIterator;
assert fieldProducerMap.containsKey(fieldInfo.name);
sequentialDocValuesIterator = new SequentialDocValuesIterator(
DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo))
new SortedNumericStarTreeValuesIterator(DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo)))
);
return sequentialDocValuesIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
.size()];
for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
}
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
// get doc id set iterators for metrics
Expand All @@ -164,7 +164,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
metric.getField(),
metricStat.getTypeName()
);
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));
}
}
int currentDocId = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal

for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
}

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
Expand All @@ -150,7 +150,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal
metric.getField(),
metricStat.getTypeName()
);
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -55,11 +54,9 @@ public class StarTreeDocsFileManager extends AbstractDocumentsFileManager implem
private RandomAccessInput starTreeDocsFileRandomInput;
private IndexOutput starTreeDocsFileOutput;
private final Map<String, Integer> fileToEndDocIdMap;
private final List<Integer> starTreeDocumentOffsets = new ArrayList<>();
private int currentFileStartDocId;
private int numReadableStarTreeDocuments;
private int starTreeFileCount = -1;
private int currBytes = 0;
private final int fileCountMergeThreshold;
private int numStarTreeDocs = 0;

Expand Down Expand Up @@ -98,15 +95,26 @@ IndexOutput createStarTreeDocumentsFileOutput() throws IOException {
public void writeStarTreeDocument(StarTreeDocument starTreeDocument, boolean isAggregatedDoc) throws IOException {
assert isAggregatedDoc == true;
int numBytes = writeStarTreeDocument(starTreeDocument, starTreeDocsFileOutput, true);
addStarTreeDocumentOffset(numBytes);
if (docSizeInBytes == -1) {
docSizeInBytes = numBytes;
} else {
assert docSizeInBytes == numBytes;
}
numStarTreeDocs++;
}

@Override
public StarTreeDocument readStarTreeDocument(int docId, boolean isAggregatedDoc) throws IOException {
assert isAggregatedDoc == true;
ensureDocumentReadable(docId);
return readStarTreeDocument(starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId), true);
return readStarTreeDocument(starTreeDocsFileRandomInput, getOffset(docId), true);
}

/**
* Returns offset for the docId based on the current file start id
*/
private long getOffset(int docId) {
return (long) (docId - currentFileStartDocId) * docSizeInBytes;
}

@Override
Expand All @@ -119,19 +127,10 @@ public Long getDimensionValue(int docId, int dimensionId) throws IOException {
public Long[] readDimensions(int docId) throws IOException {
ensureDocumentReadable(docId);
Long[] dims = new Long[starTreeField.getDimensionsOrder().size()];
readDimensions(dims, starTreeDocsFileRandomInput, starTreeDocumentOffsets.get(docId));
readDimensions(dims, starTreeDocsFileRandomInput, getOffset(docId));
return dims;
}

private void addStarTreeDocumentOffset(int bytes) {
starTreeDocumentOffsets.add(currBytes);
currBytes += bytes;
if (docSizeInBytes == -1) {
docSizeInBytes = bytes;
}
assert docSizeInBytes == bytes;
}

/**
* Load the correct StarTreeDocuments file based on the docId
*/
Expand Down Expand Up @@ -199,7 +198,6 @@ private void loadStarTreeDocumentFile(int docId) throws IOException {
* If the operation is only for reading existing documents, a new file is not created.
*/
private void closeAndMaybeCreateNewFile(boolean shouldCreateFileForAppend, int numStarTreeDocs) throws IOException {
currBytes = 0;
if (starTreeDocsFileOutput != null) {
fileToEndDocIdMap.put(starTreeDocsFileOutput.getName(), numStarTreeDocs);
IOUtils.close(starTreeDocsFileOutput);
Expand Down Expand Up @@ -232,7 +230,6 @@ private void mergeFiles(int numStarTreeDocs) throws IOException {
deleteOldFiles();
fileToEndDocIdMap.clear();
fileToEndDocIdMap.put(mergedOutput.getName(), numStarTreeDocs);
resetStarTreeDocumentOffsets();
}
}

Expand All @@ -259,17 +256,6 @@ private void deleteOldFiles() throws IOException {
}
}

/**
* Reset the star tree document offsets based on the merged file
*/
private void resetStarTreeDocumentOffsets() {
int curr = 0;
for (int i = 0; i < starTreeDocumentOffsets.size(); i++) {
starTreeDocumentOffsets.set(i, curr);
curr += docSizeInBytes;
}
}

@Override
public void close() {
try {
Expand All @@ -288,7 +274,6 @@ public void close() {
tmpDirectory.deleteFile(file);
} catch (IOException ignored) {} // similar to IOUtils.deleteFilesIgnoringExceptions
}
starTreeDocumentOffsets.clear();
fileToEndDocIdMap.clear();
}
}
Loading

0 comments on commit 80ff07e

Please sign in to comment.