-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce derived vector source via stored fields #2449
Changes from 9 commits
ad07bf0
31de672
927b6de
58164bb
9c8394e
c8ff878
85a4c09
939cec1
4bc2828
74387f9
d449151
c3483df
399d281
14d3ead
1fe7302
b3aba03
fd32a12
6462c4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.index.codec.KNN9120Codec; | ||
|
||
import lombok.AllArgsConstructor; | ||
import org.apache.lucene.codecs.StoredFieldsFormat; | ||
import org.apache.lucene.codecs.StoredFieldsReader; | ||
import org.apache.lucene.codecs.StoredFieldsWriter; | ||
import org.apache.lucene.index.FieldInfo; | ||
import org.apache.lucene.index.FieldInfos; | ||
import org.apache.lucene.index.SegmentInfo; | ||
import org.apache.lucene.index.SegmentReadState; | ||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.opensearch.index.mapper.MappedFieldType; | ||
import org.opensearch.index.mapper.MapperService; | ||
import org.opensearch.knn.index.KNNSettings; | ||
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier; | ||
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorInjector; | ||
import org.opensearch.knn.index.mapper.KNNVectorFieldType; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import static org.opensearch.knn.common.KNNConstants.DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY; | ||
import static org.opensearch.knn.common.KNNConstants.DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE; | ||
|
||
@AllArgsConstructor | ||
public class DerivedSourceStoredFieldsFormat extends StoredFieldsFormat { | ||
|
||
private final StoredFieldsFormat delegate; | ||
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier; | ||
// IMPORTANT Do not rely on this for the reader, it will be null if SPI is used | ||
private final MapperService mapperService; | ||
jmazanec15 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Override | ||
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, IOContext ioContext) | ||
throws IOException { | ||
List<FieldInfo> derivedVectorFields = new ArrayList<>(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we short circuit the code to return early in case of setting is disabled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the reader format, due to SPI, we do not have access to KNNSettings. So we cant check if the setting is set or not. I add a shortcircuit if no fields have the attribute: https://github.com/opensearch-project/k-NN/pull/2449/files#diff-f8a9ebad33a21a479b30eb0dfa0bcc6aa7ddfcb6c464eca0371b60d3c3a38e77R49 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we lazy create derivedVectorFields else we will have empty list of array even though setting is disabled? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure I can add that in the below for loop. |
||
for (FieldInfo fieldInfo : fieldInfos) { | ||
if (DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE.equals(fieldInfo.attributes().get(DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY))) { | ||
jmazanec15 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
derivedVectorFields.add(fieldInfo); | ||
} | ||
} | ||
// If no fields have it enabled, | ||
if (derivedVectorFields.isEmpty()) { | ||
return delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext); | ||
} | ||
DerivedSourceVectorInjector derivedSourceVectorInjector = new DerivedSourceVectorInjector( | ||
derivedSourceReadersSupplier, | ||
new SegmentReadState(directory, segmentInfo, fieldInfos, ioContext), | ||
derivedVectorFields | ||
); | ||
return new DerivedSourceStoredFieldsReader( | ||
delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext), | ||
derivedSourceVectorInjector | ||
); | ||
} | ||
|
||
@Override | ||
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo segmentInfo, IOContext ioContext) throws IOException { | ||
StoredFieldsWriter delegateWriter = delegate.fieldsWriter(directory, segmentInfo, ioContext); | ||
if (mapperService != null && KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings())) { | ||
List<String> vectorFieldTypes = new ArrayList<>(); | ||
for (MappedFieldType fieldType : mapperService.fieldTypes()) { | ||
if (fieldType instanceof KNNVectorFieldType) { | ||
vectorFieldTypes.add(fieldType.name()); | ||
jmazanec15 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
if (vectorFieldTypes.isEmpty() == false) { | ||
return new DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes); | ||
} | ||
} | ||
return delegateWriter; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.index.codec.KNN9120Codec; | ||
|
||
import lombok.RequiredArgsConstructor; | ||
import lombok.Setter; | ||
import org.apache.lucene.codecs.StoredFieldsReader; | ||
import org.apache.lucene.index.StoredFieldVisitor; | ||
import org.opensearch.index.fieldvisitor.FieldsVisitor; | ||
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceStoredFieldVisitor; | ||
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorInjector; | ||
|
||
import java.io.IOException; | ||
|
||
@RequiredArgsConstructor | ||
public class DerivedSourceStoredFieldsReader extends StoredFieldsReader { | ||
private final StoredFieldsReader delegate; | ||
// Given docId and source, process source | ||
private final DerivedSourceVectorInjector derivedSourceVectorInjector; | ||
|
||
@Setter | ||
private boolean shouldInject = true; | ||
jmazanec15 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
@Override | ||
public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The document method directly couples vector injection with DerivedSourceVectorInjector. This makes it harder to extend or modify the injection logic. Could we abstracts or implement some loose coupling here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand completely - we do need to create a custom stored fields visitor in order to get access to the source, so I needed to pass it in there. I could add the logics around the fieldsvisitor casting into the DerivedSourceStoredFieldVisitor. |
||
// If the visitor has explicitly indicated it does not need the fields, we should not inject them | ||
boolean isVisitorNeedFields = true; | ||
if (storedFieldVisitor instanceof FieldsVisitor) { | ||
isVisitorNeedFields = derivedSourceVectorInjector.shouldInject( | ||
((FieldsVisitor) storedFieldVisitor).includes(), | ||
((FieldsVisitor) storedFieldVisitor).excludes() | ||
); | ||
} | ||
if (shouldInject && isVisitorNeedFields) { | ||
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorInjector)); | ||
jmazanec15 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return; | ||
} | ||
delegate.document(docId, storedFieldVisitor); | ||
} | ||
|
||
@Override | ||
public StoredFieldsReader clone() { | ||
return new DerivedSourceStoredFieldsReader(delegate.clone(), derivedSourceVectorInjector); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will clone affect the refcounts for delegate in any way? Are we sure it will be closed when its supposed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let me double check this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is supposed to heavily delegate. So, when a certain method is called, we want to call the delegate's method. Hence, I believe this is correct There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
@Override | ||
public void checkIntegrity() throws IOException { | ||
delegate.checkIntegrity(); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
delegate.close(); | ||
shatejas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
/** | ||
* For merging, we need to tell the derived source stored fields reader to skip injecting the source. Otherwise, | ||
* on merge we will end up just writing the source to disk | ||
* | ||
* @param storedFieldsReader stored fields reader to wrap | ||
* @return wrapped stored fields reader | ||
*/ | ||
public static StoredFieldsReader wrapForMerge(StoredFieldsReader storedFieldsReader) { | ||
jmazanec15 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if (storedFieldsReader instanceof DerivedSourceStoredFieldsReader) { | ||
StoredFieldsReader storedFieldsReaderClone = storedFieldsReader.clone(); | ||
((DerivedSourceStoredFieldsReader) storedFieldsReaderClone).setShouldInject(false); | ||
return storedFieldsReaderClone; | ||
} | ||
return storedFieldsReader; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.index.codec.KNN9120Codec; | ||
|
||
import lombok.RequiredArgsConstructor; | ||
import org.apache.lucene.codecs.StoredFieldsWriter; | ||
import org.apache.lucene.index.FieldInfo; | ||
import org.apache.lucene.index.MergeState; | ||
import org.apache.lucene.store.DataInput; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.opensearch.common.collect.Tuple; | ||
import org.opensearch.common.io.stream.BytesStreamOutput; | ||
import org.opensearch.common.xcontent.XContentHelper; | ||
import org.opensearch.common.xcontent.support.XContentMapValues; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.core.xcontent.MediaType; | ||
import org.opensearch.core.xcontent.MediaTypeRegistry; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.index.mapper.SourceFieldMapper; | ||
|
||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
@RequiredArgsConstructor | ||
public class DerivedSourceStoredFieldsWriter extends StoredFieldsWriter { | ||
|
||
private final StoredFieldsWriter delegate; | ||
private final List<String> vectorFieldTypes; | ||
|
||
@Override | ||
public void startDocument() throws IOException { | ||
delegate.startDocument(); | ||
} | ||
|
||
@Override | ||
public void writeField(FieldInfo fieldInfo, int i) throws IOException { | ||
delegate.writeField(fieldInfo, i); | ||
} | ||
|
||
@Override | ||
public void writeField(FieldInfo fieldInfo, long l) throws IOException { | ||
delegate.writeField(fieldInfo, l); | ||
} | ||
|
||
@Override | ||
public void writeField(FieldInfo fieldInfo, float v) throws IOException { | ||
delegate.writeField(fieldInfo, v); | ||
} | ||
|
||
@Override | ||
public void writeField(FieldInfo fieldInfo, double v) throws IOException { | ||
delegate.writeField(fieldInfo, v); | ||
} | ||
|
||
@Override | ||
public void writeField(FieldInfo info, DataInput value, int length) throws IOException { | ||
delegate.writeField(info, value, length); | ||
} | ||
|
||
@Override | ||
public int merge(MergeState mergeState) throws IOException { | ||
// We have to wrap these here to avoid storing the vectors during merge | ||
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) { | ||
mergeState.storedFieldsReaders[i] = DerivedSourceStoredFieldsReader.wrapForMerge(mergeState.storedFieldsReaders[i]); | ||
} | ||
return delegate.merge(mergeState); | ||
} | ||
|
||
@Override | ||
public void writeField(FieldInfo fieldInfo, BytesRef bytesRef) throws IOException { | ||
// Parse out the vectors from the source | ||
if (Objects.equals(fieldInfo.name, SourceFieldMapper.NAME) && !vectorFieldTypes.isEmpty()) { | ||
Tuple<? extends MediaType, Map<String, Object>> mapTuple = XContentHelper.convertToMap( | ||
jmazanec15 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
BytesReference.fromByteBuffer(ByteBuffer.wrap(bytesRef.bytes)), | ||
true, | ||
MediaTypeRegistry.JSON | ||
); | ||
Map<String, Object> filteredSource = XContentMapValues.filter(null, vectorFieldTypes.toArray(new String[0])) | ||
.apply(mapTuple.v2()); | ||
BytesStreamOutput bStream = new BytesStreamOutput(); | ||
MediaType actualContentType = mapTuple.v1(); | ||
XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource); | ||
builder.close(); | ||
BytesReference bytesReference = bStream.bytes(); | ||
delegate.writeField(fieldInfo, bytesReference.toBytesRef()); | ||
return; | ||
} | ||
delegate.writeField(fieldInfo, bytesRef); | ||
} | ||
|
||
@Override | ||
public void writeField(FieldInfo fieldInfo, String s) throws IOException { | ||
delegate.writeField(fieldInfo, s); | ||
} | ||
|
||
@Override | ||
public void finishDocument() throws IOException { | ||
delegate.finishDocument(); | ||
} | ||
|
||
@Override | ||
public void finish(int i) throws IOException { | ||
delegate.finish(i); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
delegate.close(); | ||
} | ||
|
||
@Override | ||
public long ramBytesUsed() { | ||
return delegate.ramBytesUsed(); | ||
jmazanec15 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit-pick] this constant is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@VijayanB mentioned this. Will remove in another review