Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28836 Parallize the file archival to improve the split times #6616

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -421,50 +427,94 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
*/
private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive, long start) throws IOException {
// short circuit if no files to move
// Early exit if no files to archive
if (toArchive.isEmpty()) {
LOG.trace("No files to archive, returning an empty list.");
return Collections.emptyList();
}

LOG.trace("Moving files to the archive directory {}", baseArchiveDir);
LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...preparing to move...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think about it, your original comment is fine.


// make sure the archive directory exists
if (!fs.exists(baseArchiveDir)) {
if (!fs.mkdirs(baseArchiveDir)) {
throw new IOException("Failed to create the archive directory:" + baseArchiveDir
+ ", quitting archive attempt.");
}
LOG.trace("Created archive directory {}", baseArchiveDir);
}
// Ensure the archive directory exists
ensureArchiveDirectoryExists(fs, baseArchiveDir);

List<File> failures = new ArrayList<>();
// Thread-safe collection for storing failures
Queue<File> failures = new ConcurrentLinkedQueue<>();
String startTime = Long.toString(start);

// Separate files and directories for processing
List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
// if its a file archive it
try {
LOG.trace("Archiving {}", file);
if (file.isFile()) {
// attempt to archive the file
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
if (file.isFile()) {
filesOnly.add(file);
} else {
handleDirectory(fs, baseArchiveDir, failures, file, start);
}
}

// Archive files concurrently
archiveFilesConcurrently(baseArchiveDir, filesOnly, failures, startTime);

return new ArrayList<>(failures); // Convert to a List for the return value
}

private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchiveDir)
throws IOException {
if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {
throw new IOException("Failed to create the archive directory: " + baseArchiveDir);
}
LOG.trace("Archive directory ready: {}", baseArchiveDir);
}

private static void handleDirectory(FileSystem fs, Path baseArchiveDir, Queue<File> failures,
File directory, long start) {
LOG.trace("Processing directory: {}, archiving its children.", directory);
Path subArchiveDir = new Path(baseArchiveDir, directory.getName());

try {
Collection<File> children = directory.getChildren();
failures.addAll(resolveAndArchive(fs, subArchiveDir, children, start));
} catch (IOException e) {
LOG.warn("Failed to archive directory: {}", directory, e);
failures.add(directory);
}
}

private static void archiveFilesConcurrently(Path baseArchiveDir, List<File> files,
Queue<File> failures, String startTime) {
LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir);

ExecutorService executorService = Executors.newCachedThreadPool();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most such similar pools are configurable.

Have you configured making the thread pool configurable ?
Would it make sense to use a global pool here, and limit the number of concurrent move operations ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering we are shutting down the threads after execution is it okay if we give some valid constant rather than a configuration? I am of the opinion that one more configuration would not help us. I also understand that having a max cap on number of threads is an important aspect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't have a lot of experience with object storage deletion performance.

Are resolveAndArchive calls serial, or is it possible to have multiple invocations running at the same time ?

What do you think @wchevreuil, @BukrosSzabolcs ?

try {
Map<File, Future<Boolean>> futureMap = new HashMap<>();

// Submit file archiving tasks
for (File file : files) {
Future<Boolean> future =
executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
futureMap.put(file, future);
}

// Process results of each task
for (Map.Entry<File, Future<Boolean>> entry : futureMap.entrySet()) {
File file = entry.getKey();
try {
if (!entry.getValue().get()) {
LOG.warn("Failed to archive file: {} into directory: {}", file, baseArchiveDir);
failures.add(file);
}
} else {
// otherwise its a directory and we need to archive all files
LOG.trace("{} is a directory, archiving children files", file);
// so we add the directory name to the one base archive
Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
// and then get all the files from that directory and attempt to
// archive those too
Collection<File> children = file.getChildren();
failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
} catch (InterruptedException e) {
LOG.error("Archiving interrupted for file: {}", file, e);
Thread.currentThread().interrupt(); // Restore interrupt status
failures.add(file);
} catch (ExecutionException e) {
LOG.error("Archiving failed for file: {}", file, e);
failures.add(file);
}
} catch (IOException e) {
LOG.warn("Failed to archive {}", file, e);
failures.add(file);
}
} finally {
executorService.shutdown();
}
return failures;
}

/**
Expand Down