Skip to content

Commit

Permalink
HBASE-28836 Parallize the file archival to improve the split times
Browse files Browse the repository at this point in the history
  • Loading branch information
mnpoonia committed Jan 21, 2025
1 parent 6f8db78 commit d3b481d
Showing 1 changed file with 81 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -421,50 +427,94 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf
*/
private static List<File> resolveAndArchive(FileSystem fs, Path baseArchiveDir,
Collection<File> toArchive, long start) throws IOException {
// short circuit if no files to move
// Early exit if no files to archive
if (toArchive.isEmpty()) {
LOG.trace("No files to archive, returning an empty list.");
return Collections.emptyList();
}

LOG.trace("Moving files to the archive directory {}", baseArchiveDir);
LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir);

// make sure the archive directory exists
if (!fs.exists(baseArchiveDir)) {
if (!fs.mkdirs(baseArchiveDir)) {
throw new IOException("Failed to create the archive directory:" + baseArchiveDir
+ ", quitting archive attempt.");
}
LOG.trace("Created archive directory {}", baseArchiveDir);
}
// Ensure the archive directory exists
ensureArchiveDirectoryExists(fs, baseArchiveDir);

List<File> failures = new ArrayList<>();
// Thread-safe collection for storing failures
Queue<File> failures = new ConcurrentLinkedQueue<>();
String startTime = Long.toString(start);

// Separate files and directories for processing
List<File> filesOnly = new ArrayList<>();
for (File file : toArchive) {
// if its a file archive it
try {
LOG.trace("Archiving {}", file);
if (file.isFile()) {
// attempt to archive the file
if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) {
LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir);
if (file.isFile()) {
filesOnly.add(file);
} else {
handleDirectory(fs, baseArchiveDir, failures, file, start);
}
}

// Archive files concurrently
archiveFilesConcurrently(baseArchiveDir, filesOnly, failures, startTime);

return new ArrayList<>(failures); // Convert to a List for the return value
}

private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchiveDir)
throws IOException {
if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) {
throw new IOException("Failed to create the archive directory: " + baseArchiveDir);
}
LOG.trace("Archive directory ready: {}", baseArchiveDir);
}

private static void handleDirectory(FileSystem fs, Path baseArchiveDir, Queue<File> failures,
File directory, long start) {
LOG.trace("Processing directory: {}, archiving its children.", directory);
Path subArchiveDir = new Path(baseArchiveDir, directory.getName());

try {
Collection<File> children = directory.getChildren();
failures.addAll(resolveAndArchive(fs, subArchiveDir, children, start));
} catch (IOException e) {
LOG.warn("Failed to archive directory: {}", directory, e);
failures.add(directory);
}
}

private static void archiveFilesConcurrently(Path baseArchiveDir, List<File> files,
Queue<File> failures, String startTime) {
LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir);

ExecutorService executorService = Executors.newCachedThreadPool();
try {
Map<File, Future<Boolean>> futureMap = new HashMap<>();

// Submit file archiving tasks
for (File file : files) {
Future<Boolean> future =
executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime));
futureMap.put(file, future);
}

// Process results of each task
for (Map.Entry<File, Future<Boolean>> entry : futureMap.entrySet()) {
File file = entry.getKey();
try {
if (!entry.getValue().get()) {
LOG.warn("Failed to archive file: {} into directory: {}", file, baseArchiveDir);
failures.add(file);
}
} else {
// otherwise its a directory and we need to archive all files
LOG.trace("{} is a directory, archiving children files", file);
// so we add the directory name to the one base archive
Path parentArchiveDir = new Path(baseArchiveDir, file.getName());
// and then get all the files from that directory and attempt to
// archive those too
Collection<File> children = file.getChildren();
failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start));
} catch (InterruptedException e) {
LOG.error("Archiving interrupted for file: {}", file, e);
Thread.currentThread().interrupt(); // Restore interrupt status
failures.add(file);
} catch (ExecutionException e) {
LOG.error("Archiving failed for file: {}", file, e);
failures.add(file);
}
} catch (IOException e) {
LOG.warn("Failed to archive {}", file, e);
failures.add(file);
}
} finally {
executorService.shutdown();
}
return failures;
}

/**
Expand Down

0 comments on commit d3b481d

Please sign in to comment.