Skip to content

Commit

Permalink
Minor test and nit fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Shreyansh Ray <[email protected]>
  • Loading branch information
rayshrey committed Jun 18, 2024
1 parent 353cbbd commit 607f6ae
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
Expand Down Expand Up @@ -146,6 +147,12 @@ public void testWritableWarmBasic() throws Exception {
// Asserting that after merge all the files from previous gen are no more part of the directory
assertTrue(filesFromPreviousGenStillPresent.isEmpty());

// Asserting that files from previous gen are not present in File Cache as well
filesBeforeMerge.stream()
.filter(file -> !FileTypeUtils.isLockFile(file))
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
.forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file))));

// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
// leaks
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1688,5 +1688,4 @@ static void validateTranslogDurabilitySettings(Settings requestSettings, Cluster
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public CloseableFilterIndexOutput(IndexOutput out, String fileName, OnCloseListe

@Override
public void close() throws IOException {
super.close();
if (isClosed.get() == false) {
super.close();
onCloseListener.onClose(fileName);
isClosed.set(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.opensearch.common.lucene.store.InputStreamIndexInput;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.CachedFullFileIndexInput;
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FullFileCachedIndexInputImpl;
import org.opensearch.index.store.remote.utils.BlockIOContext;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.index.store.remote.utils.TransferManager;
Expand Down Expand Up @@ -116,7 +116,7 @@ public void deleteFile(String name) throws IOException {
} else if (Arrays.asList(listAll()).contains(name) == false) {
throw new NoSuchFileException("File " + name + " not found in directory");
} else {
fileCache.remove(localDirectory.getDirectory().resolve(name));
fileCache.remove(getFilePath(name));
}
}

Expand All @@ -131,7 +131,7 @@ public long fileLength(String name) throws IOException {
ensureOpen();
logger.trace("Composite Directory[{}]: fileLength() called {}", this::toString, () -> name);
long fileLength;
Path key = localDirectory.getDirectory().resolve(name);
Path key = getFilePath(name);
if (FileTypeUtils.isTempFile(name) || fileCache.get(key) != null) {
try {
fileLength = localDirectory.fileLength(name);
Expand Down Expand Up @@ -194,7 +194,7 @@ public void rename(String source, String dest) throws IOException {
ensureOpen();
logger.trace("Composite Directory[{}]: rename() called : source-{}, dest-{}", this::toString, () -> source, () -> dest);
localDirectory.rename(source, dest);
fileCache.remove(localDirectory.getDirectory().resolve(source));
fileCache.remove(getFilePath(source));
cacheFile(dest);
}

Expand All @@ -215,7 +215,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
return localDirectory.openInput(name, context);
}
// Return directly from the FileCache (via TransferManager) if complete file is present
Path key = localDirectory.getDirectory().resolve(name);
Path key = getFilePath(name);
CachedIndexInput indexInput = fileCache.get(key);
if (indexInput != null) {
logger.trace("Composite Directory[{}]: Complete file {} found in FileCache", this::toString, () -> name);
Expand Down Expand Up @@ -281,8 +281,13 @@ Uncomment the below commented line(to remove the file from cache once uploaded)
this::toString,
() -> file
);
fileCache.decRef(localDirectory.getDirectory().resolve(file));
// fileCache.remove(localDirectory.getDirectory().resolve(fileName));
fileCache.decRef(getFilePath(file));
// fileCache.remove(getFilePath(fileName));
}

// Visibility public since we need it in IT tests
public Path getFilePath(String name) {
return localDirectory.getDirectory().resolve(name);
}

/**
Expand Down Expand Up @@ -327,13 +332,13 @@ private String[] getRemoteFiles() throws IOException {
}

private void cacheFile(String name) throws IOException {
Path filePath = localDirectory.getDirectory().resolve(name);
Path filePath = getFilePath(name);
// put will increase the refCount for the path, making sure it is not evicted, will decrease the ref after it is uploaded to Remote
// so that it can be evicted after that
// this is just a temporary solution, will pin the file once support for that is added in FileCache
// TODO : Pin the above filePath in the file cache once pinning support is added so that it cannot be evicted unless it has been
// successfully uploaded to Remote
fileCache.put(filePath, new FullFileCachedIndexInputImpl(fileCache, filePath, localDirectory.openInput(name, IOContext.DEFAULT)));
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, localDirectory.openInput(name, IOContext.DEFAULT)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* @opensearch.experimental
*/
@ExperimentalApi
public class FullFileCachedIndexInputImpl implements CachedIndexInput {
public class CachedFullFileIndexInput implements CachedIndexInput {
private final FileCache fileCache;
private final Path path;
private final FullFileCachedIndexInput fullFileCachedIndexInput;
Expand All @@ -30,7 +30,7 @@ public class FullFileCachedIndexInputImpl implements CachedIndexInput {
/**
* Constructor - takes IndexInput as parameter
*/
public FullFileCachedIndexInputImpl(FileCache fileCache, Path path, IndexInput indexInput) {
public CachedFullFileIndexInput(FileCache fileCache, Path path, IndexInput indexInput) {
this.fileCache = fileCache;
this.path = path;
fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, path, indexInput);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void close() throws IOException {
clones.forEach(indexInput -> {
try {
indexInput.close();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
logger.trace("Exception while closing clone - {}", e.getMessage());
}
});
try {
Expand All @@ -81,6 +81,7 @@ public void close() throws IOException {
logger.trace("FullFileCachedIndexInput already closed");
}
luceneIndexInput = null;
clones.clear();
closed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ public void testClose() throws IOException {

public void testAfterSyncToRemote() throws IOException {
// File will be present locally until uploaded to Remote
assertTrue(existsInLocalDirectory("_1.cfe"));
compositeDirectory.afterSyncToRemote("_1.cfe");
assertTrue(existsInLocalDirectory(FILE_PRESENT_LOCALLY));
compositeDirectory.afterSyncToRemote(FILE_PRESENT_LOCALLY);
fileCache.prune();
// After uploading to Remote, refCount will be decreased by 1 making it 0 and will be evicted if cache is pruned
assertFalse(existsInLocalDirectory("_1.cfe"));
assertFalse(existsInLocalDirectory(FILE_PRESENT_LOCALLY));
// Asserting file is not present in FileCache
assertNull(fileCache.get(getFilePath(FILE_PRESENT_LOCALLY)));
}

private void addFilesToDirectory(String[] files) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void setup() throws IOException {

protected void setupIndexInputAndAddToFileCache() {
fileCachedIndexInput = new FileCachedIndexInput(fileCache, filePath, underlyingIndexInput);
fileCache.put(filePath, new FullFileCachedIndexInputImpl(fileCache, filePath, fileCachedIndexInput));
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fileCachedIndexInput));
}

public void testClone() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index.store.remote.filecache;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexInput;

import java.io.IOException;
Expand All @@ -18,16 +19,20 @@ public class FullFileCachedIndexInputTests extends FileCachedIndexInputTests {
@Override
protected void setupIndexInputAndAddToFileCache() {
fullFileCachedIndexInput = new FullFileCachedIndexInput(fileCache, filePath, underlyingIndexInput);
fileCache.put(filePath, new FullFileCachedIndexInputImpl(fileCache, filePath, fullFileCachedIndexInput));
fileCache.put(filePath, new CachedFullFileIndexInput(fileCache, filePath, fullFileCachedIndexInput));
}

@Override
public void testClone() throws IOException {
setupIndexInputAndAddToFileCache();

// Since the file ia already in cache and has refCount 1, activeUsage and totalUsage will be same
// Since the file is already in cache and has refCount 1, activeUsage and totalUsage will be same
assertTrue(isActiveAndTotalUsageSame());

// Getting the file cache entry (which wil increase the ref count, hence doing dec ref immediately afterwards)
CachedIndexInput cachedIndexInput = fileCache.get(filePath);
fileCache.decRef(filePath);

// Decrementing the refCount explicitly on the file which will make it inactive (as refCount will drop to 0)
fileCache.decRef(filePath);
assertFalse(isActiveAndTotalUsageSame());
Expand All @@ -38,9 +43,15 @@ public void testClone() throws IOException {
FileCachedIndexInput clonedFileCachedIndexInput3 = clonedFileCachedIndexInput2.clone();
assertTrue(isActiveAndTotalUsageSame());

// Closing the parent will close all the clones decreasing the refCount to 0
fullFileCachedIndexInput.close();
// closing the first level clone will close all subsequent level clones and reduce ref count to 0
clonedFileCachedIndexInput1.close();
assertFalse(isActiveAndTotalUsageSame());

fileCache.prune();

// since the file cache entry was evicted the corresponding CachedIndexInput will be closed and will throw exception when trying to
// read the index input
assertThrows(AlreadyClosedException.class, cachedIndexInput::getIndexInput);
}

@Override
Expand Down

0 comments on commit 607f6ae

Please sign in to comment.