Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce derived vector source via stored fields #2449

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a new build mode, `FAISS_OPT_LEVEL=avx512_spr`, which enables the use of advanced AVX-512 instructions introduced with Intel(R) Sapphire Rapids (#2404)[https://github.com/opensearch-project/k-NN/pull/2404]
- Add cosine similarity support for faiss engine (#2376)[https://github.com/opensearch-project/k-NN/pull/2376]
- Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]

- Add derived source feature for vector fields (#2449)[https://github.com/opensearch-project/k-NN/pull/2449]
### Enhancements
- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241]
- Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290]
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ integTest {
systemProperty("https", is_https)
systemProperty("user", user)
systemProperty("password", password)
systemProperty("test.exhaustive", System.getProperty("test.exhaustive"))
navneet1v marked this conversation as resolved.
Show resolved Hide resolved

doFirst {
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
Expand Down Expand Up @@ -451,6 +452,7 @@ task integTestRemote(type: RestIntegTestTask) {
systemProperty 'cluster.number_of_nodes', "${_numNodes}"

systemProperty 'tests.security.manager', 'false'
systemProperty("test.exhaustive", System.getProperty("test.exhaustive"))

// Run tests with remote cluster only if rest case is defined
if (System.getProperty("tests.rest.cluster") != null) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/opensearch/knn/common/KNNConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,8 @@ public class KNNConstants {

public static final String MODE_PARAMETER = "mode";
public static final String COMPRESSION_LEVEL_PARAMETER = "compression_level";

public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY = "knn-derived-source-enabled";
public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE = "true";
public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_FALSE_VALUE = "false";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit-pick] this constant is not used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@VijayanB mentioned this. Will remove in another review

}
23 changes: 22 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class KNNSettings {
public static final String KNN_FAISS_AVX512_DISABLED = "knn.faiss.avx512.disabled";
public static final String KNN_FAISS_AVX512_SPR_DISABLED = "knn.faiss.avx512_spr.disabled";
public static final String KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED = "index.knn.disk.vector.shard_level_rescoring_disabled";
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";

/**
* Default setting values
Expand Down Expand Up @@ -269,6 +270,14 @@ public class KNNSettings {
Setting.Property.Dynamic
);

public static final Setting<Boolean> KNN_DERIVED_SOURCE_ENABLED_SETTING = Setting.boolSetting(
KNN_DERIVED_SOURCE_ENABLED,
false,
IndexScope,
Final,
UnmodifiableOnRestore
);

/**
* This setting identifies KNN index.
*/
Expand Down Expand Up @@ -518,6 +527,9 @@ private Setting<?> getSetting(String key) {
if (KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED.equals(key)) {
return KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING;
}
if (KNN_DERIVED_SOURCE_ENABLED.equals(key)) {
return KNN_DERIVED_SOURCE_ENABLED_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}
Expand All @@ -543,7 +555,8 @@ public List<Setting<?>> getSettings() {
KNN_FAISS_AVX512_SPR_DISABLED_SETTING,
QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING,
QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES_SETTING,
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down Expand Up @@ -581,6 +594,14 @@ public static boolean isFaissAVX2Disabled() {
}
}

/**
* check this index enabled/disabled derived source
* @param settings Settings
*/
public static boolean isKNNDerivedSourceEnabled(Settings settings) {
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
return KNN_DERIVED_SOURCE_ENABLED_SETTING.get(settings);
}

public static boolean isFaissAVX512Disabled() {
return Booleans.parseBoolean(
Objects.requireNonNullElse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.KNN9120Codec;

import lombok.AllArgsConstructor;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.opensearch.common.Nullable;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
import org.opensearch.knn.index.mapper.KNNVectorFieldType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.knn.common.KNNConstants.DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY;
import static org.opensearch.knn.common.KNNConstants.DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE;

@AllArgsConstructor
public class DerivedSourceStoredFieldsFormat extends StoredFieldsFormat {

private final StoredFieldsFormat delegate;
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
// IMPORTANT Do not rely on this for the reader, it will be null if SPI is used
@Nullable
private final MapperService mapperService;
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, IOContext ioContext)
throws IOException {
List<FieldInfo> derivedVectorFields = null;
for (FieldInfo fieldInfo : fieldInfos) {
if (DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE.equals(fieldInfo.attributes().get(DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY))) {
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
// Lazily initialize the list of fields
if (derivedVectorFields == null) {
derivedVectorFields = new ArrayList<>();
}
derivedVectorFields.add(fieldInfo);
}
}
// If no fields have it enabled, we can just short-circuit and return the delegate's fieldReader
if (derivedVectorFields == null || derivedVectorFields.isEmpty()) {
return delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext);
}
return new DerivedSourceStoredFieldsReader(
delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext),
derivedVectorFields,
derivedSourceReadersSupplier,
new SegmentReadState(directory, segmentInfo, fieldInfos, ioContext)
);
}

@Override
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo segmentInfo, IOContext ioContext) throws IOException {
StoredFieldsWriter delegateWriter = delegate.fieldsWriter(directory, segmentInfo, ioContext);
if (mapperService != null && KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings())) {
List<String> vectorFieldTypes = new ArrayList<>();
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
if (fieldType instanceof KNNVectorFieldType) {
vectorFieldTypes.add(fieldType.name());
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
}
}
if (vectorFieldTypes.isEmpty() == false) {
return new DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
}
}
return delegateWriter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.KNN9120Codec;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.util.IOUtils;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceStoredFieldVisitor;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorInjector;

import java.io.IOException;
import java.util.List;

public class DerivedSourceStoredFieldsReader extends StoredFieldsReader {
private final StoredFieldsReader delegate;
private final List<FieldInfo> derivedVectorFields;
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
private final SegmentReadState segmentReadState;
private final boolean shouldInject;

private final DerivedSourceVectorInjector derivedSourceVectorInjector;

/**
*
* @param delegate delegate StoredFieldsReader
* @param derivedVectorFields List of fields that are derived source fields
* @param derivedSourceReadersSupplier Supplier for the derived source readers
* @param segmentReadState SegmentReadState for the segment
* @throws IOException in case of I/O error
*/
public DerivedSourceStoredFieldsReader(
StoredFieldsReader delegate,
List<FieldInfo> derivedVectorFields,
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
SegmentReadState segmentReadState
) throws IOException {
this(delegate, derivedVectorFields, derivedSourceReadersSupplier, segmentReadState, true);
}

private DerivedSourceStoredFieldsReader(
StoredFieldsReader delegate,
List<FieldInfo> derivedVectorFields,
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
SegmentReadState segmentReadState,
boolean shouldInject
) throws IOException {
this.delegate = delegate;
this.derivedVectorFields = derivedVectorFields;
this.derivedSourceReadersSupplier = derivedSourceReadersSupplier;
this.segmentReadState = segmentReadState;
this.shouldInject = shouldInject;
this.derivedSourceVectorInjector = createDerivedSourceVectorInjector();
}

private DerivedSourceVectorInjector createDerivedSourceVectorInjector() throws IOException {
return new DerivedSourceVectorInjector(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
}

@Override
public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The document method directly couples vector injection with DerivedSourceVectorInjector. This makes it harder to extend or modify the injection logic. Could we abstracts or implement some loose coupling here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand completely - we do need to create a custom stored fields visitor in order to get access to the source, so I needed to pass it in there. I could add the logics around the fieldsvisitor casting into the DerivedSourceStoredFieldVisitor.

// If the visitor has explicitly indicated it does not need the fields, we should not inject them
boolean isVisitorNeedFields = true;
if (storedFieldVisitor instanceof FieldsVisitor) {
isVisitorNeedFields = derivedSourceVectorInjector.shouldInject(
((FieldsVisitor) storedFieldVisitor).includes(),
((FieldsVisitor) storedFieldVisitor).excludes()
);
}
if (shouldInject && isVisitorNeedFields) {
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorInjector));
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
delegate.document(docId, storedFieldVisitor);
}

@Override
public StoredFieldsReader clone() {
try {
return new DerivedSourceStoredFieldsReader(
delegate.clone(),
derivedVectorFields,
derivedSourceReadersSupplier,
segmentReadState,
shouldInject
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void checkIntegrity() throws IOException {
delegate.checkIntegrity();
}

@Override
public void close() throws IOException {
IOUtils.close(delegate, derivedSourceVectorInjector);
}

/**
* For merging, we need to tell the derived source stored fields reader to skip injecting the source. Otherwise,
* on merge we will end up just writing the source to disk
*
* @return Merged instance that wont inject by default
*/
@Override
public StoredFieldsReader getMergeInstance() {
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just give the this instance why we are creating a new instance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that for merging, the reader is used. So, if we use this, it will add the vectors back into source

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add this as a java doc here. This seems like a good case where someone needs to know why we need to create merge instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think covered in abvove comment

return new DerivedSourceStoredFieldsReader(
delegate.getMergeInstance(),
derivedVectorFields,
derivedSourceReadersSupplier,
segmentReadState,
false
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Loading
Loading