Skip to content

Commit

Permalink
Introduce derived vector source via stored fields (#2467)
Browse files Browse the repository at this point in the history
Generates the vector source in the source field from the
KnnVectorsFormat or BVD. It does this by adding StoredFieldsFormat to
our existing custom codec.

Currently, feature is experimental and behind a feature flag via index
setting. In the future, we need to iterate to improve performance
and stability for nested/object portions.

Signed-off-by: John Mazanec <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] authored Jan 30, 2025
1 parent 168ee3c commit 9b07a12
Show file tree
Hide file tree
Showing 29 changed files with 3,015 additions and 52 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add a new build mode, `FAISS_OPT_LEVEL=avx512_spr`, which enables the use of advanced AVX-512 instructions introduced with Intel(R) Sapphire Rapids (#2404)[https://github.com/opensearch-project/k-NN/pull/2404]
- Add cosine similarity support for faiss engine (#2376)[https://github.com/opensearch-project/k-NN/pull/2376]
- Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]

- Add derived source feature for vector fields (#2449)[https://github.com/opensearch-project/k-NN/pull/2449]
### Enhancements
- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241]
- Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290]
Expand Down
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ integTest {
systemProperty("https", is_https)
systemProperty("user", user)
systemProperty("password", password)
systemProperty("test.exhaustive", System.getProperty("test.exhaustive"))

doFirst {
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can
Expand Down Expand Up @@ -455,6 +456,7 @@ task integTestRemote(type: RestIntegTestTask) {
systemProperty 'cluster.number_of_nodes', "${_numNodes}"

systemProperty 'tests.security.manager', 'false'
systemProperty("test.exhaustive", System.getProperty("test.exhaustive"))

// Run tests with remote cluster only if rest case is defined
if (System.getProperty("tests.rest.cluster") != null) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/opensearch/knn/common/KNNConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,8 @@ public class KNNConstants {

public static final String MODE_PARAMETER = "mode";
public static final String COMPRESSION_LEVEL_PARAMETER = "compression_level";

public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY = "knn-derived-source-enabled";
public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE = "true";
public static final String DERIVED_VECTOR_FIELD_ATTRIBUTE_FALSE_VALUE = "false";
}
23 changes: 22 additions & 1 deletion src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class KNNSettings {
public static final String KNN_FAISS_AVX512_DISABLED = "knn.faiss.avx512.disabled";
public static final String KNN_FAISS_AVX512_SPR_DISABLED = "knn.faiss.avx512_spr.disabled";
public static final String KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED = "index.knn.disk.vector.shard_level_rescoring_disabled";
public static final String KNN_DERIVED_SOURCE_ENABLED = "index.knn.derived_source.enabled";

/**
* Default setting values
Expand Down Expand Up @@ -269,6 +270,14 @@ public class KNNSettings {
Setting.Property.Dynamic
);

public static final Setting<Boolean> KNN_DERIVED_SOURCE_ENABLED_SETTING = Setting.boolSetting(
KNN_DERIVED_SOURCE_ENABLED,
false,
IndexScope,
Final,
UnmodifiableOnRestore
);

/**
* This setting identifies KNN index.
*/
Expand Down Expand Up @@ -518,6 +527,9 @@ private Setting<?> getSetting(String key) {
if (KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED.equals(key)) {
return KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING;
}
if (KNN_DERIVED_SOURCE_ENABLED.equals(key)) {
return KNN_DERIVED_SOURCE_ENABLED_SETTING;
}

throw new IllegalArgumentException("Cannot find setting by key [" + key + "]");
}
Expand All @@ -543,7 +555,8 @@ public List<Setting<?>> getSettings() {
KNN_FAISS_AVX512_SPR_DISABLED_SETTING,
QUANTIZATION_STATE_CACHE_SIZE_LIMIT_SETTING,
QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES_SETTING,
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING
KNN_DISK_VECTOR_SHARD_LEVEL_RESCORING_DISABLED_SETTING,
KNN_DERIVED_SOURCE_ENABLED_SETTING
);
return Stream.concat(settings.stream(), Stream.concat(getFeatureFlags().stream(), dynamicCacheSettings.values().stream()))
.collect(Collectors.toList());
Expand Down Expand Up @@ -581,6 +594,14 @@ public static boolean isFaissAVX2Disabled() {
}
}

/**
* check this index enabled/disabled derived source
* @param settings Settings
*/
public static boolean isKNNDerivedSourceEnabled(Settings settings) {
return KNN_DERIVED_SOURCE_ENABLED_SETTING.get(settings);
}

public static boolean isFaissAVX512Disabled() {
return Booleans.parseBoolean(
Objects.requireNonNullElse(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.KNN9120Codec;

import lombok.AllArgsConstructor;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.opensearch.common.Nullable;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
import org.opensearch.knn.index.mapper.KNNVectorFieldType;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.knn.common.KNNConstants.DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY;
import static org.opensearch.knn.common.KNNConstants.DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE;

@AllArgsConstructor
public class DerivedSourceStoredFieldsFormat extends StoredFieldsFormat {

private final StoredFieldsFormat delegate;
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
// IMPORTANT Do not rely on this for the reader, it will be null if SPI is used
@Nullable
private final MapperService mapperService;

@Override
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, IOContext ioContext)
throws IOException {
List<FieldInfo> derivedVectorFields = null;
for (FieldInfo fieldInfo : fieldInfos) {
if (DERIVED_VECTOR_FIELD_ATTRIBUTE_TRUE_VALUE.equals(fieldInfo.attributes().get(DERIVED_VECTOR_FIELD_ATTRIBUTE_KEY))) {
// Lazily initialize the list of fields
if (derivedVectorFields == null) {
derivedVectorFields = new ArrayList<>();
}
derivedVectorFields.add(fieldInfo);
}
}
// If no fields have it enabled, we can just short-circuit and return the delegate's fieldReader
if (derivedVectorFields == null || derivedVectorFields.isEmpty()) {
return delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext);
}
return new DerivedSourceStoredFieldsReader(
delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext),
derivedVectorFields,
derivedSourceReadersSupplier,
new SegmentReadState(directory, segmentInfo, fieldInfos, ioContext)
);
}

@Override
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo segmentInfo, IOContext ioContext) throws IOException {
StoredFieldsWriter delegateWriter = delegate.fieldsWriter(directory, segmentInfo, ioContext);
if (mapperService != null && KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings())) {
List<String> vectorFieldTypes = new ArrayList<>();
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
if (fieldType instanceof KNNVectorFieldType) {
vectorFieldTypes.add(fieldType.name());
}
}
if (vectorFieldTypes.isEmpty() == false) {
return new DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
}
}
return delegateWriter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.index.codec.KNN9120Codec;

import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.util.IOUtils;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceStoredFieldVisitor;
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorInjector;

import java.io.IOException;
import java.util.List;

public class DerivedSourceStoredFieldsReader extends StoredFieldsReader {
private final StoredFieldsReader delegate;
private final List<FieldInfo> derivedVectorFields;
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
private final SegmentReadState segmentReadState;
private final boolean shouldInject;

private final DerivedSourceVectorInjector derivedSourceVectorInjector;

/**
*
* @param delegate delegate StoredFieldsReader
* @param derivedVectorFields List of fields that are derived source fields
* @param derivedSourceReadersSupplier Supplier for the derived source readers
* @param segmentReadState SegmentReadState for the segment
* @throws IOException in case of I/O error
*/
public DerivedSourceStoredFieldsReader(
StoredFieldsReader delegate,
List<FieldInfo> derivedVectorFields,
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
SegmentReadState segmentReadState
) throws IOException {
this(delegate, derivedVectorFields, derivedSourceReadersSupplier, segmentReadState, true);
}

private DerivedSourceStoredFieldsReader(
StoredFieldsReader delegate,
List<FieldInfo> derivedVectorFields,
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
SegmentReadState segmentReadState,
boolean shouldInject
) throws IOException {
this.delegate = delegate;
this.derivedVectorFields = derivedVectorFields;
this.derivedSourceReadersSupplier = derivedSourceReadersSupplier;
this.segmentReadState = segmentReadState;
this.shouldInject = shouldInject;
this.derivedSourceVectorInjector = createDerivedSourceVectorInjector();
}

private DerivedSourceVectorInjector createDerivedSourceVectorInjector() throws IOException {
return new DerivedSourceVectorInjector(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
}

@Override
public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IOException {
// If the visitor has explicitly indicated it does not need the fields, we should not inject them
boolean isVisitorNeedFields = true;
if (storedFieldVisitor instanceof FieldsVisitor) {
isVisitorNeedFields = derivedSourceVectorInjector.shouldInject(
((FieldsVisitor) storedFieldVisitor).includes(),
((FieldsVisitor) storedFieldVisitor).excludes()
);
}
if (shouldInject && isVisitorNeedFields) {
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorInjector));
return;
}
delegate.document(docId, storedFieldVisitor);
}

@Override
public StoredFieldsReader clone() {
try {
return new DerivedSourceStoredFieldsReader(
delegate.clone(),
derivedVectorFields,
derivedSourceReadersSupplier,
segmentReadState,
shouldInject
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void checkIntegrity() throws IOException {
delegate.checkIntegrity();
}

@Override
public void close() throws IOException {
IOUtils.close(delegate, derivedSourceVectorInjector);
}

/**
* For merging, we need to tell the derived source stored fields reader to skip injecting the source. Otherwise,
* on merge we will end up just writing the source to disk
*
* @return Merged instance that wont inject by default
*/
@Override
public StoredFieldsReader getMergeInstance() {
try {
return new DerivedSourceStoredFieldsReader(
delegate.getMergeInstance(),
derivedVectorFields,
derivedSourceReadersSupplier,
segmentReadState,
false
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 9b07a12

Please sign in to comment.