Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/feat/zk_auth' into feat/zk_auth
Browse files Browse the repository at this point in the history
  • Loading branch information
dl239 committed Nov 7, 2023
2 parents 351ca4d + 1b028a0 commit ff249f9
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;

import java.util.concurrent.TimeUnit;
import java.util.List;
Expand All @@ -46,12 +49,26 @@ public CuratorFramework getClient() {
public boolean connect() throws InterruptedException {
log.info("ZKClient connect with config: {}", config);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(config.getBaseSleepTime(), config.getMaxRetries());
CuratorFramework client = CuratorFrameworkFactory.builder()
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(config.getCluster())
.sessionTimeoutMs(config.getSessionTimeout())
.connectionTimeoutMs(config.getConnectionTimeout())
.retryPolicy(retryPolicy)
.build();
.retryPolicy(retryPolicy);
if (!config.getCert().isEmpty()) {
builder.authorization("digest", config.getCert().getBytes())
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}

@Override
public List<ACL> getAclForPath(String s) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
CuratorFramework client = builder.build();
client.start();
if (!client.blockUntilConnected(config.getMaxConnectWaitTime(), TimeUnit.MILLISECONDS)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ public class ZKConfig {
private int baseSleepTime = 1000;
@Builder.Default
private int maxConnectWaitTime = 30000;
@Builder.Default
private String cert = "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class SdkOption {
private String sparkConfPath = "";
private int zkLogLevel = 3;
private String zkLogFile = "";
private String zkCert = "";

// options for standalone mode
private String host = "";
Expand Down Expand Up @@ -70,6 +71,7 @@ public SQLRouterOptions buildSQLRouterOptions() throws SqlException {
copt.setSpark_conf_path(getSparkConfPath());
copt.setZk_log_level(getZkLogLevel());
copt.setZk_log_file(getZkLogFile());
copt.setZk_cert(getZkCert());

// base
buildBaseOptions(copt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -59,16 +62,34 @@ public TaskManagerClient(String endpoint) {
}

public TaskManagerClient(String zkCluster, String zkPath) throws Exception {
this(zkCluster, zkPath, "");
}

public TaskManagerClient(String zkCluster, String zkPath, String zkCert) throws Exception {
if (zkCluster == null || zkPath == null) {
logger.info("Zookeeper address is wrong, please check the configuration");
}
String masterZnode = zkPath + "/taskmanager/leader";

zkClient = CuratorFrameworkFactory.builder()
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkCluster)
.sessionTimeoutMs(10000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
.retryPolicy(new ExponentialBackoffRetry(1000, 10));
if (!zkCert.isEmpty()) {
builder.authorization("digest", zkCert.getBytes())
.aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}

@Override
public List<ACL> getAclForPath(String s) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
zkClient = builder.build();
zkClient.start();
Stat stat = zkClient.checkExists().forPath(masterZnode);
if (stat != null) { // The original master exists and is directly connected to it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public static String getZkRootPath() {
return getString("zookeeper.root_path");
}

public static String getZkCert() {
return props.getProperty("zookeeper.cert", "");
}

public static int getZkConnectionTimeout() {
return getInt("zookeeper.connection_timeout");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private void initExternalFunction() throws InterruptedException {
.connectionTimeout(TaskManagerConfig.getZkConnectionTimeout())
.maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime())
.maxRetries(TaskManagerConfig.getZkMaxRetries())
.cert(TaskManagerConfig.getZkCert())
.build());
zkClient.connect();

Expand Down

0 comments on commit ff249f9

Please sign in to comment.