Skip to content

Commit

Permalink
HDFS-17651.[ARR] Async handler executor isolation (#7244). Contribute…
Browse files Browse the repository at this point in the history
…d by hfutatzhanghb.

Reviewed-by: Jian Zhang <[email protected]>
Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
hfutatzhanghb authored and KeeProMise committed Jan 20, 2025
1 parent d83dae4 commit 616d707
Show file tree
Hide file tree
Showing 13 changed files with 190 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
public final class AsyncRpcProtocolPBUtil {
public static final Logger LOG = LoggerFactory.getLogger(AsyncRpcProtocolPBUtil.class);
/** The executor used for handling responses asynchronously. */
private static Executor worker;
private static Executor asyncResponderExecutor;

private AsyncRpcProtocolPBUtil() {}

Expand Down Expand Up @@ -97,7 +97,7 @@ public static <T, R> R asyncIpcClient(
} catch (Exception ex) {
throw warpCompletionException(ex);
}
}, worker));
}, asyncResponderExecutor));
return asyncReturn(clazz);
}

Expand Down Expand Up @@ -144,10 +144,10 @@ public static <T> void asyncRouterServer(ServerReq<T> req, ServerRes<T> res) {
* Sets the executor used for handling responses asynchronously within
* the utility class.
*
* @param worker The executor to be used for handling responses asynchronously.
* @param asyncResponderExecutor The executor to be used for handling responses asynchronously.
*/
public static void setWorker(Executor worker) {
AsyncRpcProtocolPBUtil.worker = worker;
public static void setAsyncResponderExecutor(Executor asyncResponderExecutor) {
AsyncRpcProtocolPBUtil.asyncResponderExecutor = asyncResponderExecutor;
}

@FunctionalInterface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,22 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_RPC_ENABLE =
FEDERATION_ROUTER_PREFIX + "rpc.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_DEFAULT = true;
public static final String DFS_ROUTER_RPC_ENABLE_ASYNC =
FEDERATION_ROUTER_PREFIX + "rpc.async.enable";
public static final boolean DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT = false;
public static final String DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT =
FEDERATION_ROUTER_PREFIX + "rpc.async.handler.count";
public static final int DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT = 2;
public static final String DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT =
FEDERATION_ROUTER_PREFIX + "rpc.async.responder.count";
public static final int DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT = 10;
// HDFS Router Asynchronous RPC
public static final String DFS_ROUTER_ASYNC_RPC_ENABLE_KEY =
FEDERATION_ROUTER_PREFIX + "async.rpc.enable";
public static final boolean DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT = false;
public static final String FEDERATION_ROUTER_ASYNC_RPC_PREFIX =
FEDERATION_ROUTER_PREFIX + "async.rpc.";
// Example: ns1:count1,ns2:count2,ns3:count3
public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "ns.handler.count";
public static final String DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT = "";
public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;

public static final String DFS_ROUTER_METRICS_ENABLE =
FEDERATION_ROUTER_PREFIX + "metrics.enable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
package org.apache.hadoop.hdfs.server.federation.router;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_ENABLE_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT;
Expand All @@ -26,16 +36,8 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_READER_QUEUE_SIZE_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DN_REPORT_CACHE_EXPIRE_MS_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RouterFederationRename.RouterRenameOption;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncCatch;
Expand All @@ -56,22 +58,24 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
Expand Down Expand Up @@ -209,6 +213,7 @@
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB;
import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB;
import org.apache.hadoop.util.ReflectionUtils;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -228,8 +233,9 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,

private static final Logger LOG =
LoggerFactory.getLogger(RouterRpcServer.class);
private ExecutorService asyncRouterHandler;
private ExecutorService asyncRouterResponder;

/** Name service keyword to identify fan-out calls. */
public static final String CONCURRENT_NS = "concurrent";

/** Configuration for the RPC server. */
private Configuration conf;
Expand Down Expand Up @@ -287,6 +293,10 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
/** Schedule the router federation rename jobs. */
private BalanceProcedureScheduler fedRenameScheduler;
private boolean enableAsync;
private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
private Map<String, ExecutorService> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
private ExecutorService routerAsyncResponderExecutor;
private ExecutorService routerDefaultAsyncHandlerExecutor;

/**
* Construct a router RPC server.
Expand Down Expand Up @@ -318,11 +328,11 @@ public RouterRpcServer(Configuration conf, Router router,
int handlerQueueSize = this.conf.getInt(DFS_ROUTER_HANDLER_QUEUE_SIZE_KEY,
DFS_ROUTER_HANDLER_QUEUE_SIZE_DEFAULT);

this.enableAsync = conf.getBoolean(DFS_ROUTER_RPC_ENABLE_ASYNC,
DFS_ROUTER_RPC_ENABLE_ASYNC_DEFAULT);
LOG.info("Router enable async {}", this.enableAsync);
this.enableAsync = conf.getBoolean(DFS_ROUTER_ASYNC_RPC_ENABLE_KEY,
DFS_ROUTER_ASYNC_RPC_ENABLE_DEFAULT);
LOG.info("Router enable async rpc: {}", this.enableAsync);
if (this.enableAsync) {
initAsyncThreadPool();
initAsyncThreadPools(conf);
}
// Override Hadoop Common IPC setting
int readerQueueSize = this.conf.getInt(DFS_ROUTER_READER_QUEUE_SIZE_KEY,
Expand Down Expand Up @@ -446,8 +456,7 @@ public RouterRpcServer(Configuration conf, Router router,
// Create the client
if (this.enableAsync) {
this.rpcClient = new RouterAsyncRpcClient(this.conf, this.router,
this.namenodeResolver, this.rpcMonitor,
routerStateIdContext, asyncRouterHandler);
this.namenodeResolver, this.rpcMonitor, routerStateIdContext);
this.clientProto = new RouterAsyncClientProtocol(conf, this);
this.nnProto = new RouterAsyncNamenodeProtocol(this);
this.routerProto = new RouterAsyncUserProtocol(this);
Expand Down Expand Up @@ -491,23 +500,77 @@ public RouterRpcServer(Configuration conf, Router router,

/**
* Init router async handlers and router async responders.
* @param configuration the configuration.
*/
public void initAsyncThreadPool() {
int asyncHandlerCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT,
DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT_DEFAULT);
int asyncResponderCount = conf.getInt(DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT,
DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT_DEFAULT);
if (asyncRouterHandler == null) {
LOG.info("init router async handler count: {}", asyncHandlerCount);
asyncRouterHandler = Executors.newFixedThreadPool(
asyncHandlerCount, new AsyncThreadFactory("router async handler "));
public void initAsyncThreadPools(Configuration configuration) {
LOG.info("Begin initialize asynchronous handler and responder thread pool.");
initNsAsyncHandlerCount();
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration);
Set<String> unassignedNS = new HashSet<>();
allConfiguredNS.add(CONCURRENT_NS);

for (String nsId : allConfiguredNS) {
int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers > 0) {
initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId);
} else {
unassignedNS.add(nsId);
}
}

int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);

if (!unassignedNS.isEmpty()) {
LOG.warn("Async handler unassigned ns: {}", unassignedNS);
LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault);
for (String nsId : unassignedNS) {
initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
}
}
if (asyncRouterResponder == null) {
LOG.info("init router async responder count: {}", asyncResponderCount);
asyncRouterResponder = Executors.newFixedThreadPool(
asyncResponderCount, new AsyncThreadFactory("router async responder "));

int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
if (routerAsyncResponderExecutor == null) {
LOG.info("Initialize router async responder count: {}", asyncResponderCount);
routerAsyncResponderExecutor = Executors.newFixedThreadPool(
asyncResponderCount, new AsyncThreadFactory("Router Async Responder #"));
}
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);

if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #"));
}
}

private void initNsAsyncHandlerCount() {
String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
if (StringUtils.isEmpty(configNsHandler)) {
LOG.error(
"The value of config key: {} is empty. Will use default conf.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
}
AsyncRpcProtocolPBUtil.setWorker(asyncRouterResponder);
String[] nsHandlers = configNsHandler.split(",");
for (String nsHandlerInfo : nsHandlers) {
String[] nsHandlerItems = nsHandlerInfo.split(":");
if (nsHandlerItems.length != 2 || StringUtils.isBlank(nsHandlerItems[0]) ||
!StringUtils.isNumeric(nsHandlerItems[1])) {
LOG.error("The config key: {} is incorrect! The value is {}.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, nsHandlerInfo);
continue;
}
nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1]));
}
}

private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) {
asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool(
dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #")));
}

/**
Expand Down Expand Up @@ -2426,8 +2489,12 @@ public boolean isAsync() {
return this.enableAsync;
}

public Executor getAsyncRouterHandler() {
return asyncRouterHandler;
public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() {
return asyncRouterHandlerExecutors;
}

public ExecutorService getRouterAsyncHandlerDefaultExecutor() {
return routerDefaultAsyncHandlerExecutor;
}

private static class AsyncThreadFactory implements ThreadFactory {
Expand All @@ -2439,8 +2506,10 @@ private static class AsyncThreadFactory implements ThreadFactory {
}

@Override
public Thread newThread(Runnable r) {
return new Thread(r, namePrefix + threadNumber.getAndIncrement());
public Thread newThread(@NonNull Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(true);
return thread;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
Expand Down Expand Up @@ -98,7 +97,6 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
private final ActiveNamenodeResolver namenodeResolver;
/** Optional perf monitor. */
private final RouterRpcMonitor rpcMonitor;
private final Executor asyncRouterHandler;

/**
* Create a router async RPC client to manage remote procedure calls to NNs.
Expand All @@ -108,17 +106,15 @@ public class RouterAsyncRpcClient extends RouterRpcClient{
* @param resolver A NN resolver to determine the currently active NN in HA.
* @param monitor Optional performance monitor.
* @param routerStateIdContext the router state context object to hold the state ids for all
* @param asyncRouterHandler async router handler
* namespaces.
*/
public RouterAsyncRpcClient(Configuration conf,
Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor,
RouterStateIdContext routerStateIdContext, Executor asyncRouterHandler) {
RouterStateIdContext routerStateIdContext) {
super(conf, router, resolver, monitor, routerStateIdContext);
this.router = router;
this.namenodeResolver = resolver;
this.rpcMonitor = monitor;
this.asyncRouterHandler = asyncRouterHandler;
}

/**
Expand Down Expand Up @@ -172,6 +168,7 @@ public Object invokeMethod(
" with params " + Arrays.deepToString(params) + " from "
+ router.getRouterId());
}
String nsid = namenodes.get(0).getNameserviceId();
// transfer threadLocalContext to worker threads of executor.
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
asyncComplete(null);
Expand All @@ -183,7 +180,8 @@ public Object invokeMethod(
threadLocalContext.transfer();
invokeMethodAsync(ugi, (List<FederationNamenodeContext>) namenodes,
useObserver, protocol, method, params);
}, asyncRouterHandler);
}, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
return null;
}

Expand Down
Loading

0 comments on commit 616d707

Please sign in to comment.