Skip to content

Commit

Permalink
Move setting for Partial Locality type behind Feature Flag, fix bug f…
Browse files Browse the repository at this point in the history
…or ref count via cloneMap in FullFileCachedIndexInput and other review fixes

Signed-off-by: Shreyansh Ray <[email protected]>
  • Loading branch information
rayshrey committed Jun 18, 2024
1 parent 533c192 commit 353cbbd
Show file tree
Hide file tree
Showing 16 changed files with 383 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.junit.annotations.TestLogging;

import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -38,9 +41,9 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
@TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
Expand All @@ -64,24 +67,34 @@ protected Settings featureFlagSettings() {

public void testWritableWarmFeatureFlagDisabled() {
Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build();
internalCluster().startDataOnlyNodes(1, clusterSettings);
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNodes(3, clusterSettings);
internalTestCluster.startDataOnlyNodes(1, clusterSettings);

Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();

assertThrows(
"index.store.locality can be set to PARTIAL only if Feature Flag ["
+ FeatureFlags.TIERED_REMOTE_INDEX_SETTING.getKey()
+ "] is set to true",
IllegalArgumentException.class,
() -> client().admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings).get()
);
try {
prepareCreate(INDEX_NAME).setSettings(indexSettings).get();
fail("Should have thrown Exception as setting should not be registered if Feature Flag is Disabled");
} catch (SettingsException ex) {
assertEquals(
"unknown setting ["
+ IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()
+ "] please check that any required plugins are installed, or check the "
+ "breaking changes documentation for removed settings",
ex.getMessage()
);
}
}

public void testWritableWarmBasic() throws Exception {
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode();
internalTestCluster.startDataOnlyNode();
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Expand Down Expand Up @@ -112,8 +125,8 @@ public void testWritableWarmBasic() throws Exception {
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

FileCache fileCache = internalCluster().getDataNodeInstance(Node.class).fileCache();
IndexShard shard = internalCluster().getDataNodeInstance(IndicesService.class)
FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();
IndexShard shard = internalTestCluster.getDataNodeInstance(IndicesService.class)
.indexService(resolveIndex(INDEX_NAME))
.getShardOrNull(0);
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate());
Expand All @@ -124,22 +137,18 @@ public void testWritableWarmBasic() throws Exception {
flushAndRefresh(INDEX_NAME);
Set<String> filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll()));

CacheUsage usageBeforePrune = fileCache.usage();
fileCache.prune();
CacheUsage usageAfterPrune = fileCache.usage();

Set<String> filesFromPreviousGenStillPresent = filesBeforeMerge.stream()
.filter(filesAfterMerge::contains)
.filter(file -> !FileTypeUtils.isLockFile(file))
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
.collect(Collectors.toUnmodifiableSet());

// Asserting that after merge all the files from previous gen are no more part of the directory
assertTrue(filesFromPreviousGenStillPresent.isEmpty());
// Asserting that after the merge, refCount of some files in FileCache dropped to zero which resulted in their eviction after
// pruning
assertTrue(usageAfterPrune.usage() < usageBeforePrune.usage());

// Clearing the file cache to avoid any file leaks
fileCache.clear();
// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
// leaks
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
fileCache.prune();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -996,7 +995,6 @@ static Settings aggregateIndexSettings(
validateStoreTypeSettings(indexSettings);
validateRefreshIntervalSettings(request.settings(), clusterSettings);
validateTranslogDurabilitySettings(request.settings(), clusterSettings, settings);
validateIndexStoreLocality(request.settings());

return indexSettings;
}
Expand Down Expand Up @@ -1691,15 +1689,4 @@ static void validateTranslogDurabilitySettings(Settings requestSettings, Cluster

}

public static void validateIndexStoreLocality(Settings indexSettings) {
if (indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.toString())
.equalsIgnoreCase(IndexModule.DataLocalityType.PARTIAL.toString())
&& !FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX_SETTING)) {
throw new IllegalArgumentException(
"index.store.locality can be set to PARTIAL only if Feature Flag ["
+ FeatureFlags.TIERED_REMOTE_INDEX_SETTING.getKey()
+ "] is set to true"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexSortConfig;
Expand Down Expand Up @@ -188,7 +189,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING,
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_LOCALITY_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS,
IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS,
Expand Down Expand Up @@ -261,7 +261,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
* is ready for production release, the feature flag can be removed, and the
* setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of();
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.TIERED_REMOTE_INDEX,
List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING)
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,6 @@ private boolean syncSegments() {
Map<String, Long> localSegmentsSizeMap = updateLocalSizeMapAndTracker(localSegmentsPostRefresh).entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

Collection<String> segmentsToRefresh = localSegmentsPostRefresh.stream()
.filter(file -> !skipUpload(file))
.collect(Collectors.toList());

CountDownLatch latch = new CountDownLatch(1);
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.lucene.store.FilterIndexOutput;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* FilterIndexOutput which takes in an additional FunctionalInterface as a parameter to perform required operations once the IndexOutput is closed
Expand All @@ -32,16 +33,21 @@ public interface OnCloseListener {

private final OnCloseListener onCloseListener;
private final String fileName;
private final AtomicBoolean isClosed;

public CloseableFilterIndexOutput(IndexOutput out, String fileName, OnCloseListener onCloseListener) {
super("CloseableFilterIndexOutput for file " + fileName, out);
this.fileName = fileName;
this.onCloseListener = onCloseListener;
this.isClosed = new AtomicBoolean(false);
}

@Override
public void close() throws IOException {
super.close();
onCloseListener.onClose(fileName);
if (isClosed.get() == false) {
onCloseListener.onClose(fileName);
isClosed.set(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FullFileCachedIndexInput;
import org.opensearch.index.store.remote.filecache.FullFileCachedIndexInputImpl;
import org.opensearch.index.store.remote.utils.BlockIOContext;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.index.store.remote.utils.TransferManager;
Expand Down Expand Up @@ -53,7 +53,6 @@ public class CompositeDirectory extends FilterDirectory {
private final RemoteSegmentStoreDirectory remoteDirectory;
private final FileCache fileCache;
private final TransferManager transferManager;
private final Set<String> pendingDeletes;

/**
* Constructor to initialise the composite directory
Expand All @@ -67,7 +66,6 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
this.localDirectory = (FSDirectory) localDirectory;
this.remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectory;
this.fileCache = fileCache;
this.pendingDeletes = new HashSet<>();
transferManager = new TransferManager(
(name, position, length) -> new InputStreamIndexInput(
remoteDirectory.openInput(name, new BlockIOContext(IOContext.DEFAULT, position, length)),
Expand Down Expand Up @@ -95,7 +93,6 @@ public String[] listAll() throws IOException {
logger.trace("Composite Directory[{}]: Remote Directory files - {}", this::toString, () -> Arrays.toString(remoteFiles));
Set<String> nonBlockLuceneFiles = allFiles.stream()
.filter(file -> !FileTypeUtils.isBlockFile(file))
.filter(file -> !pendingDeletes.contains(file))
.collect(Collectors.toUnmodifiableSet());
String[] files = new String[nonBlockLuceneFiles.size()];
nonBlockLuceneFiles.toArray(files);
Expand All @@ -113,14 +110,13 @@ public String[] listAll() throws IOException {
@Override
public void deleteFile(String name) throws IOException {
ensureOpen();
ensureFileNotDeleted(name);
logger.trace("Composite Directory[{}]: deleteFile() called {}", this::toString, () -> name);
if (FileTypeUtils.isTempFile(name)) {
localDirectory.deleteFile(name);
} else if (Arrays.asList(listAll()).contains(name) == false) {
throw new NoSuchFileException("File " + name + " not found in directory");
} else {
pendingDeletes.add(name);
fileCache.remove(localDirectory.getDirectory().resolve(name));
}
}

Expand All @@ -133,7 +129,6 @@ public void deleteFile(String name) throws IOException {
@Override
public long fileLength(String name) throws IOException {
ensureOpen();
ensureFileNotDeleted(name);
logger.trace("Composite Directory[{}]: fileLength() called {}", this::toString, () -> name);
long fileLength;
Path key = localDirectory.getDirectory().resolve(name);
Expand Down Expand Up @@ -170,10 +165,6 @@ public long fileLength(String name) throws IOException {
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
ensureOpen();
// If file was deleted earlier, removing it from the deleted list
if (pendingDeletes.contains(name)) {
pendingDeletes.remove(name);
}
logger.trace("Composite Directory[{}]: createOutput() called {}", this::toString, () -> name);
// The CloseableFilterIndexOutput will ensure that the file is added to FileCache once write is completed on this file
return new CloseableFilterIndexOutput(localDirectory.createOutput(name, context), name, this::cacheFile);
Expand Down Expand Up @@ -201,7 +192,6 @@ public void sync(Collection<String> names) throws IOException {
@Override
public void rename(String source, String dest) throws IOException {
ensureOpen();
ensureFileNotDeleted(source);
logger.trace("Composite Directory[{}]: rename() called : source-{}, dest-{}", this::toString, () -> source, () -> dest);
localDirectory.rename(source, dest);
fileCache.remove(localDirectory.getDirectory().resolve(source));
Expand All @@ -217,7 +207,6 @@ public void rename(String source, String dest) throws IOException {
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
ensureFileNotDeleted(name);
logger.trace("Composite Directory[{}]: openInput() called {}", this::toString, () -> name);
// We aren't tracking temporary files (created via createTempOutput) currently in FileCache as these are created and then deleted
// within a very short span of time
Expand Down Expand Up @@ -269,6 +258,11 @@ public void close() throws IOException {
localDirectory.close();
}

@Override
public String toString() {
return "Composite Directory @ " + Integer.toHexString(hashCode());
}

/**
* Function to perform operations once files have been uploaded to Remote Store
* Currently deleting the local files here, as once uploaded to Remote, local files become eligible for eviction from FileCache
Expand Down Expand Up @@ -314,15 +308,6 @@ private void validate(Directory localDirectory, Directory remoteDirectory, FileC
);
}

/**
* Ensure that the file has not already been deleted
*/
private void ensureFileNotDeleted(String name) throws IOException {
if (pendingDeletes.contains(name)) {
throw new NoSuchFileException("File " + name + " is already pending delete");
}
}

/**
* Return the list of files present in Remote
*/
Expand All @@ -348,7 +333,7 @@ private void cacheFile(String name) throws IOException {
// this is just a temporary solution, will pin the file once support for that is added in FileCache
// TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been
// successfully uploaded to Remote
fileCache.put(filePath, new FullFileCachedIndexInput(fileCache, filePath, localDirectory.openInput(name, IOContext.READ)));
fileCache.put(filePath, new FullFileCachedIndexInputImpl(fileCache, filePath, localDirectory.openInput(name, IOContext.DEFAULT)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public class FileCachedIndexInput extends IndexInput implements RandomAccessInpu
protected IndexInput luceneIndexInput;

/** indicates if this IndexInput instance is a clone or not */
private final boolean isClone;
protected final boolean isClone;

private volatile boolean closed = false;
protected volatile boolean closed = false;

public FileCachedIndexInput(FileCache cache, Path filePath, IndexInput underlyingIndexInput) {
this(cache, filePath, underlyingIndexInput, false);
Expand Down Expand Up @@ -133,23 +133,8 @@ public FileCachedIndexInput clone() {

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
if (offset < 0 || length < 0 || offset + length > this.length()) {
throw new IllegalArgumentException(
"slice() "
+ sliceDescription
+ " out of bounds: offset="
+ offset
+ ",length="
+ length
+ ",fileLength="
+ this.length()
+ ": "
+ this
);
}
IndexInput slicedIndexInput = luceneIndexInput.slice(sliceDescription, offset, length);
cache.incRef(filePath);
return new FileCachedIndexInput(cache, filePath, slicedIndexInput, true);
// never reach here!
throw new UnsupportedOperationException("FileCachedIndexInput couldn't be sliced.");
}

@Override
Expand Down
Loading

0 comments on commit 353cbbd

Please sign in to comment.