Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove FileWatcher from KNN. #2182

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
### Features
### Enhancements
* Remove FSDirectory dependency and deprecated FileWatcher [#2182](https://github.com/opensearch-project/k-NN/pull/2182)
0ctopus13prime marked this conversation as resolved.
Show resolved Hide resolved
### Bug Fixes
### Infrastructure
* Removed JDK 11 and 17 version from CI runs [#1921](https://github.com/opensearch-project/k-NN/pull/1921)
Expand Down
51 changes: 28 additions & 23 deletions src/main/java/org/opensearch/knn/index/KNNIndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.knn.common.FieldInfoExtractor;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
import org.opensearch.knn.index.memory.NativeMemoryAllocation;
Expand All @@ -29,9 +30,7 @@
import org.opensearch.knn.index.engine.KNNEngine;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -94,14 +93,18 @@ public void warmup() throws IOException {
try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-warmup")) {
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
try {
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(
engineFileContext.vectorFileName,
engineFileContext.segmentInfo
);
nativeMemoryCacheManager.get(
new NativeMemoryEntryContext.IndexEntryContext(
directory,
engineFileContext.getIndexPath(),
cacheKey,
NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance(),
getParametersAtLoading(
engineFileContext.getSpaceType(),
KNNEngine.getEngineNameFromPath(engineFileContext.getIndexPath()),
KNNEngine.getEngineNameFromPath(engineFileContext.getVectorFileName()),
getIndexName(),
engineFileContext.getVectorDataType()
),
Expand Down Expand Up @@ -133,9 +136,13 @@ public void clearCache() {
indexAllocation.writeLock();
log.info("[KNN] Evicting index from cache: [{}]", indexName);
try (Engine.Searcher searcher = indexShard.acquireSearcher(INDEX_SHARD_CLEAR_CACHE_SEARCHER)) {
getAllEngineFileContexts(searcher.getIndexReader()).forEach(
(engineFileContext) -> nativeMemoryCacheManager.invalidate(engineFileContext.getIndexPath())
);
getAllEngineFileContexts(searcher.getIndexReader()).forEach((engineFileContext) -> {
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(
engineFileContext.vectorFileName,
engineFileContext.segmentInfo
);
nativeMemoryCacheManager.invalidate(cacheKey);
});
} catch (IOException ex) {
log.error("[KNN] Failed to evict index from cache: [{}]", indexName, ex);
throw new RuntimeException(ex);
Expand Down Expand Up @@ -166,7 +173,6 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine

for (LeafReaderContext leafReaderContext : indexReader.leaves()) {
SegmentReader reader = Lucene.segmentReader(leafReaderContext.reader());
Path shardPath = ((FSDirectory) FilterDirectory.unwrap(reader.directory())).getDirectory();
String fileExtension = reader.getSegmentInfo().info.getUseCompoundFile()
? knnEngine.getCompoundExtension()
: knnEngine.getExtension();
Expand All @@ -180,11 +186,9 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine
String modelId = fieldInfo.attributes().getOrDefault(MODEL_ID, null);
engineFiles.addAll(
getEngineFileContexts(
reader.getSegmentInfo().files(),
reader.getSegmentInfo().info.name,
reader.getSegmentInfo(),
fieldInfo.name,
fileExtension,
shardPath,
spaceType,
modelId,
FieldInfoExtractor.extractQuantizationConfig(fieldInfo) == QuantizationConfig.EMPTY
Expand All @@ -202,22 +206,22 @@ List<EngineFileContext> getEngineFileContexts(IndexReader indexReader, KNNEngine

@VisibleForTesting
List<EngineFileContext> getEngineFileContexts(
Collection<String> files,
String segmentName,
SegmentCommitInfo segmentCommitInfo,
String fieldName,
String fileExtension,
Path shardPath,
SpaceType spaceType,
String modelId,
VectorDataType vectorDataType
) {
String prefix = buildEngineFilePrefix(segmentName);
String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return files.stream()
) throws IOException {
// Ex: 0_
final String prefix = buildEngineFilePrefix(segmentCommitInfo.info.name);
// Ex: _my_field.faiss
final String suffix = buildEngineFileSuffix(fieldName, fileExtension);
return segmentCommitInfo.files()
.stream()
.filter(fileName -> fileName.startsWith(prefix))
.filter(fileName -> fileName.endsWith(suffix))
.map(fileName -> shardPath.resolve(fileName).toString())
.map(fileName -> new EngineFileContext(spaceType, modelId, fileName, vectorDataType))
.map(vectorFileName -> new EngineFileContext(spaceType, modelId, vectorFileName, vectorDataType, segmentCommitInfo.info))
.collect(Collectors.toList());
}

Expand All @@ -227,7 +231,8 @@ List<EngineFileContext> getEngineFileContexts(
static class EngineFileContext {
private final SpaceType spaceType;
private final String modelId;
private final String indexPath;
private final String vectorFileName;
private final VectorDataType vectorDataType;
private final SegmentInfo segmentInfo;
}
}
6 changes: 4 additions & 2 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,10 @@ public static boolean isFaissAVX2Disabled() {

public static boolean isFaissAVX512Disabled() {
return Booleans.parseBoolean(
KNNSettings.state().getSettingValue(KNNSettings.KNN_FAISS_AVX512_DISABLED).toString(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KNNSettings.state().getSettingValue(KNNSettings.KNN_FAISS_AVX512_DISABLED) can return null value, so we should not call toString without checking its nullity.

KNN_DEFAULT_FAISS_AVX512_DISABLED_VALUE
Objects.requireNonNullElse(
KNNSettings.state().getSettingValue(KNNSettings.KNN_FAISS_AVX512_DISABLED),
KNN_DEFAULT_FAISS_AVX512_DISABLED_VALUE
).toString()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,61 +11,36 @@

package org.opensearch.knn.index.codec.KNN80Codec;

import lombok.NonNull;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.FieldInfo;
import java.io.IOException;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SegmentReadState;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.common.io.PathUtils;
import org.opensearch.knn.common.FieldInfoExtractor;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.opensearch.knn.common.KNNConstants.MODEL_ID;
import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

@Log4j2
public class KNN80DocValuesProducer extends DocValuesProducer {

private final SegmentReadState state;
private final DocValuesProducer delegate;
private final NativeMemoryCacheManager nativeMemoryCacheManager;
private final Map<String, String> indexPathMap = new HashMap();
private final Map<String, String> fieldNameToVectorFileName = new HashMap<>();

public KNN80DocValuesProducer(DocValuesProducer delegate, SegmentReadState state) {
this.delegate = delegate;
this.state = state;
this.nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();

Directory directory = state.directory;
// directory would be CompoundDirectory, we need get directory firstly and then unwrap
if (state.directory instanceof KNN80CompoundDirectory) {
directory = ((KNN80CompoundDirectory) state.directory).getDir();
}

Directory dir = FilterDirectory.unwrap(directory);
if (!(dir instanceof FSDirectory)) {
log.warn("{} can not casting to FSDirectory", directory);
return;
}
String directoryPath = ((FSDirectory) dir).getDirectory().toString();
for (FieldInfo field : state.fieldInfos) {
if (!field.attributes().containsKey(KNN_FIELD)) {
continue;
Expand All @@ -74,18 +49,16 @@ public KNN80DocValuesProducer(DocValuesProducer delegate, SegmentReadState state
// By default, we don't create BinaryDocValues for knn field anymore. However, users can set doc_values = true
// to create binary doc values explicitly like any other field. Hence, we only want to include fields
// where approximate search is possible only by BinaryDocValues.
if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues() == true) {
if (field.getDocValuesType() != DocValuesType.BINARY || field.hasVectorValues()) {
continue;
}
// Only Native Engine put into indexPathMap
KNNEngine knnEngine = getNativeKNNEngine(field);
if (knnEngine == null) {

final String vectorIndexFileName = KNNCodecUtil.getEngineFileFromFieldInfo(field, state.segmentInfo);
if (vectorIndexFileName == null) {
continue;
}
List<String> engineFiles = KNNCodecUtil.getEngineFiles(knnEngine.getExtension(), field.name, state.segmentInfo);
Path indexPath = PathUtils.get(directoryPath, engineFiles.get(0));
indexPathMap.putIfAbsent(field.getName(), indexPath.toString());

final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, state.segmentInfo);
fieldNameToVectorFileName.putIfAbsent(field.getName(), cacheKey);
}
}

Expand Down Expand Up @@ -121,32 +94,12 @@ public void checkIntegrity() throws IOException {

@Override
public void close() throws IOException {
for (String path : indexPathMap.values()) {
nativeMemoryCacheManager.invalidate(path);
}
final NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
fieldNameToVectorFileName.values().forEach(nativeMemoryCacheManager::invalidate);
delegate.close();
}

public final List<String> getOpenedIndexPath() {
return new ArrayList<>(indexPathMap.values());
}

/**
* Get KNNEngine From FieldInfo
*
* @param field which field we need produce from engine
* @return if and only if Native Engine we return specific engine, else return null
*/
private KNNEngine getNativeKNNEngine(@NonNull FieldInfo field) {

final String modelId = field.attributes().get(MODEL_ID);
if (modelId != null) {
return null;
}
KNNEngine engine = FieldInfoExtractor.extractKNNEngine(field);
if (KNNEngine.getEnginesThatCreateCustomSegmentFiles().contains(engine)) {
return engine;
}
return null;
return new ArrayList<>(fieldNameToVectorFileName.values());
}
}
0ctopus13prime marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.IOUtils;
import org.opensearch.common.UUIDs;
import org.opensearch.knn.index.codec.util.KNNCodecUtil;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationState;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationStateCacheManager;
Expand All @@ -33,6 +36,8 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.knn.index.mapper.KNNVectorFieldMapper.KNN_FIELD;

/**
* Vectors reader class for reading the flat vectors for native engines. The class provides methods for iterating
* over the vectors and retrieving their values.
Expand All @@ -42,11 +47,30 @@ public class NativeEngines990KnnVectorsReader extends KnnVectorsReader {
private final FlatVectorsReader flatVectorsReader;
private final SegmentReadState segmentReadState;
private Map<String, String> quantizationStateCacheKeyPerField;
private final Map<String, String> fieldNameToVectorFileName = new HashMap<>();
0ctopus13prime marked this conversation as resolved.
Show resolved Hide resolved

public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) throws IOException {
public NativeEngines990KnnVectorsReader(final SegmentReadState state, final FlatVectorsReader flatVectorsReader) {
this.segmentReadState = state;
this.flatVectorsReader = flatVectorsReader;
loadCacheKeyMap();
fillIndexToVectorFileName();
}

private void fillIndexToVectorFileName() {
for (FieldInfo field : segmentReadState.fieldInfos) {
if (!field.attributes().containsKey(KNN_FIELD)) {
continue;
}
if (!field.hasVectorValues()) {
continue;
}
shatejas marked this conversation as resolved.
Show resolved Hide resolved
final String vectorIndexFileName = KNNCodecUtil.getEngineFileFromFieldInfo(field, segmentReadState.segmentInfo);
if (vectorIndexFileName == null) {
continue;
}
final String cacheKey = NativeMemoryCacheKeyHelper.constructCacheKey(vectorIndexFileName, segmentReadState.segmentInfo);
fieldNameToVectorFileName.putIfAbsent(field.getName(), cacheKey);
}
}

/**
Expand Down Expand Up @@ -176,10 +200,18 @@ public void search(String field, byte[] target, KnnCollector knnCollector, Bits
*/
@Override
public void close() throws IOException {
// Clean up allocated vector indices resources from cache.
final NativeMemoryCacheManager nativeMemoryCacheManager = NativeMemoryCacheManager.getInstance();
fieldNameToVectorFileName.values().forEach(nativeMemoryCacheManager::invalidate);

// Close a reader.
IOUtils.close(flatVectorsReader);

// Clean up quantized state cache.
if (quantizationStateCacheKeyPerField != null) {
final QuantizationStateCacheManager quantizationStateCacheManager = QuantizationStateCacheManager.getInstance();
for (String cacheKey : quantizationStateCacheKeyPerField.values()) {
QuantizationStateCacheManager.getInstance().evict(cacheKey);
quantizationStateCacheManager.evict(cacheKey);
}
}
}
Expand All @@ -192,7 +224,7 @@ public long ramBytesUsed() {
return flatVectorsReader.ramBytesUsed();
}

private void loadCacheKeyMap() throws IOException {
private void loadCacheKeyMap() {
quantizationStateCacheKeyPerField = new HashMap<>();
for (FieldInfo fieldInfo : segmentReadState.fieldInfos) {
String cacheKey = UUIDs.base64UUID();
Expand Down
Loading
Loading