diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java index b2ea9cd33a0b..d6ecc1c03de5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java @@ -23,8 +23,14 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -421,50 +427,94 @@ public static void archiveStoreFile(Configuration conf, FileSystem fs, RegionInf */ private static List resolveAndArchive(FileSystem fs, Path baseArchiveDir, Collection toArchive, long start) throws IOException { - // short circuit if no files to move + // Early exit if no files to archive if (toArchive.isEmpty()) { + LOG.trace("No files to archive, returning an empty list."); return Collections.emptyList(); } - LOG.trace("Moving files to the archive directory {}", baseArchiveDir); + LOG.trace("Preparing to archive files into directory: {}", baseArchiveDir); - // make sure the archive directory exists - if (!fs.exists(baseArchiveDir)) { - if (!fs.mkdirs(baseArchiveDir)) { - throw new IOException("Failed to create the archive directory:" + baseArchiveDir - + ", quitting archive attempt."); - } - LOG.trace("Created archive directory {}", baseArchiveDir); - } + // Ensure the archive directory exists + ensureArchiveDirectoryExists(fs, baseArchiveDir); - List failures = new ArrayList<>(); + // Thread-safe collection for storing failures + Queue failures = new ConcurrentLinkedQueue<>(); String startTime = Long.toString(start); + + // Separate files and directories for processing + List filesOnly = new ArrayList<>(); for (File file : toArchive) { - // if its a file archive it - try { - LOG.trace("Archiving {}", file); - if (file.isFile()) { - // attempt to archive the file - if (!resolveAndArchiveFile(baseArchiveDir, file, startTime)) { - LOG.warn("Couldn't archive " + file + " into backup directory: " + baseArchiveDir); + if (file.isFile()) { + filesOnly.add(file); + } else { + handleDirectory(fs, baseArchiveDir, failures, file, start); + } + } + + // Archive files concurrently + archiveFilesConcurrently(baseArchiveDir, filesOnly, failures, startTime); + + return new ArrayList<>(failures); // Convert to a List for the return value + } + + private static void ensureArchiveDirectoryExists(FileSystem fs, Path baseArchiveDir) + throws IOException { + if (!fs.exists(baseArchiveDir) && !fs.mkdirs(baseArchiveDir)) { + throw new IOException("Failed to create the archive directory: " + baseArchiveDir); + } + LOG.trace("Archive directory ready: {}", baseArchiveDir); + } + + private static void handleDirectory(FileSystem fs, Path baseArchiveDir, Queue failures, + File directory, long start) { + LOG.trace("Processing directory: {}, archiving its children.", directory); + Path subArchiveDir = new Path(baseArchiveDir, directory.getName()); + + try { + Collection children = directory.getChildren(); + failures.addAll(resolveAndArchive(fs, subArchiveDir, children, start)); + } catch (IOException e) { + LOG.warn("Failed to archive directory: {}", directory, e); + failures.add(directory); + } + } + + private static void archiveFilesConcurrently(Path baseArchiveDir, List files, + Queue failures, String startTime) { + LOG.trace("Archiving {} files concurrently into directory: {}", files.size(), baseArchiveDir); + + ExecutorService executorService = Executors.newCachedThreadPool(); + try { + Map> futureMap = new HashMap<>(); + + // Submit file archiving tasks + for (File file : files) { + Future future = + executorService.submit(() -> resolveAndArchiveFile(baseArchiveDir, file, startTime)); + futureMap.put(file, future); + } + + // Process results of each task + for (Map.Entry> entry : futureMap.entrySet()) { + File file = entry.getKey(); + try { + if (!entry.getValue().get()) { + LOG.warn("Failed to archive file: {} into directory: {}", file, baseArchiveDir); failures.add(file); } - } else { - // otherwise its a directory and we need to archive all files - LOG.trace("{} is a directory, archiving children files", file); - // so we add the directory name to the one base archive - Path parentArchiveDir = new Path(baseArchiveDir, file.getName()); - // and then get all the files from that directory and attempt to - // archive those too - Collection children = file.getChildren(); - failures.addAll(resolveAndArchive(fs, parentArchiveDir, children, start)); + } catch (InterruptedException e) { + LOG.error("Archiving interrupted for file: {}", file, e); + Thread.currentThread().interrupt(); // Restore interrupt status + failures.add(file); + } catch (ExecutionException e) { + LOG.error("Archiving failed for file: {}", file, e); + failures.add(file); } - } catch (IOException e) { - LOG.warn("Failed to archive {}", file, e); - failures.add(file); } + } finally { + executorService.shutdown(); } - return failures; } /**