Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-7505 HBase 3 compatibility changes: Update zookeeper handling #2056

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ public static Optional<HighAvailabilityGroup> get(String url, Properties propert
*
* @param url The HA connection url optionally; empty optional if properties disables fallback
* @param properties The client connection properties
* @return The connection url of the single cluster to fall back
* @return The connection url of the single cluster to fall back on,
* with a fully qualified JDBC protocol
* @throws SQLException if fails to get HA information and/or invalid properties are seen
*/
static Optional<String> getFallbackCluster(String url, Properties properties) throws SQLException {
Expand All @@ -309,6 +310,13 @@ static Optional<String> getFallbackCluster(String url, Properties properties) th
if (StringUtils.isEmpty(fallbackCluster)) {
fallbackCluster = haGroupInfo.getUrl1();
}

// Ensure the fallback cluster URL includes the JDBC protocol prefix
if (!fallbackCluster.startsWith(PhoenixRuntime.JDBC_PROTOCOL_ZK)) {
fallbackCluster = PhoenixRuntime.JDBC_PROTOCOL_ZK
+ PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + fallbackCluster;
}

LOG.info("Falling back to single cluster '{}' for the HA group {} to serve HA connection "
+ "request against url '{}'.",
fallbackCluster, haGroupInfo.getName(), url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public static String getLocalZkUrl(Configuration conf) {
}

String portStr = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
int port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
int port = HConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
if (portStr != null) {
try {
port = Integer.parseInt(portStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import javax.annotation.Nullable;

import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.ZKConnectionInfo;
Expand Down Expand Up @@ -219,7 +220,11 @@ public static String getSchema(String url, Properties info, String defaultValue)
public static String formatZookeeperUrl(String jdbcUrl) {
ConnectionInfo connInfo;
try {
connInfo = ConnectionInfo.create(jdbcUrl, null, null);
Properties info = new Properties();
// Make sure we use ZK on HBase 3.x
info.put(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
ZKConnectionInfo.ZK_REGISTRY_NAME);
connInfo = ConnectionInfo.create(jdbcUrl, null, info);
// TODO in theory we could support non-ZK registries for HA.
// However, as HA already relies on ZK, this wouldn't be particularly useful,
// and would require significant changes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
import org.apache.phoenix.jdbc.ZKConnectionInfo;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.query.ConfigurationFactory;
import org.apache.phoenix.util.InstanceResolver;
Expand All @@ -58,6 +59,7 @@ public static synchronized void setUp() throws Exception {
conf = hbaseTestUtil.getConfiguration();
setUpConfigForMiniCluster(conf);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase-test");
conf.set("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
hbaseTestUtil.startMiniCluster();
Class.forName(PhoenixDriver.class.getName());
DriverManager.registerDriver(new PhoenixTestDriver());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.phoenix.end2end;

import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
Expand Down Expand Up @@ -60,7 +60,7 @@ public static synchronized void setUpBeforeClass() throws Exception {
hbaseTestUtil = new HBaseTestingUtility(conf);
hbaseTestUtil.startMiniCluster();
String clientPort = hbaseTestUtil.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
String url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
String url = JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
+ JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
driver = initAndRegisterTestDriver(url, ReadOnlyProps.EMPTY_PROPS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ private void verifySyscatData(Properties clientProps, String connName, Statement
}

private String getJdbcUrl() {
return "jdbc:phoenix:localhost:" + testUtil.getZkCluster().getClientPort() + ":/hbase";
return "jdbc:phoenix+zk:localhost:" + testUtil.getZkCluster().getClientPort() + ":/hbase";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static synchronized void setUp() throws Exception {

hbaseTestUtil.startMiniCluster();
zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
url = PhoenixRuntime.JDBC_PROTOCOL_ZK + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;

DriverManager.registerDriver(PhoenixDriver.INSTANCE);
DriverManager.registerDriver(new PhoenixTestDriver());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.PhoenixStatement;
Expand Down Expand Up @@ -75,7 +76,7 @@ public static synchronized void setUp() throws Exception {
hbaseTestUtil.startMiniCluster();
// establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
url = PhoenixRuntime.JDBC_PROTOCOL_ZK + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
}

Expand All @@ -85,12 +86,14 @@ public Connection createConnection(String tenantId, boolean isDifferentClient) t
// force the use of ConnectionQueryServicesImpl instead of ConnectionQueryServicesTestImpl
props.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
if (tenantId!=null)
if (tenantId!=null) {
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
StringBuilder sb = new StringBuilder(url);
if (isDifferentClient)
sb.append(PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "Client2");
return DriverManager.getConnection(sb.toString(), props);
}
if (isDifferentClient) {
ConnectionInfo info = ConnectionInfo.createNoLogin(url, null, props);
return DriverManager.getConnection(info.withPrincipal(tenantId).toUrl(), props);
}
return DriverManager.getConnection(url, props);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public static synchronized void doSetup() throws Exception {
hbaseTestUtil.startMiniCluster();
// establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
url = PhoenixRuntime.JDBC_PROTOCOL_ZK + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
Properties driverProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
try (PhoenixConnection phxConn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.jdbc.ZKConnectionInfo;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
Expand Down Expand Up @@ -71,6 +72,8 @@ public void testUpdateCacheFrequencyWithAddAndDropTable() throws Exception {
longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
longRunningProps.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
longRunningProps.put("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);

Connection conn1 = DriverManager.getConnection(url, longRunningProps);
String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries";
Connection conn2 = DriverManager.getConnection(url2, longRunningProps);
Expand Down Expand Up @@ -136,6 +139,7 @@ public void testTableSentWhenIndexStateChanges() throws Throwable {
longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
longRunningProps.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString());
longRunningProps.put("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
Connection conn1 = DriverManager.getConnection(url, longRunningProps);
String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries";
Connection conn2 = DriverManager.getConnection(url2, longRunningProps);
Expand Down Expand Up @@ -171,6 +175,7 @@ public void testTableSentWhenIndexStateChanges() throws Throwable {
public void testUpdateCacheFrequencyWithAddColumn() throws Exception {
// Create connections 1 and 2
Properties longRunningProps = new Properties(); // Must update config before starting server
longRunningProps.put("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
Connection conn1 = DriverManager.getConnection(url, longRunningProps);
Connection conn2 = DriverManager.getConnection(url, longRunningProps);
conn1.setAutoCommit(true);
Expand Down Expand Up @@ -218,6 +223,7 @@ public void testUpdateCacheFrequencyWithAddAndDropIndex() throws Exception {
Properties longRunningProps = new Properties();
longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
longRunningProps.put("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
Connection conn1 = DriverManager.getConnection(url, longRunningProps);
String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries";
Connection conn2 = DriverManager.getConnection(url2, longRunningProps);
Expand Down Expand Up @@ -269,6 +275,7 @@ public void testUpdateCacheFrequencyWithAddAndDropView() throws Exception {
Properties longRunningProps = new Properties();
longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
longRunningProps.put("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
Connection conn1 = DriverManager.getConnection(url, longRunningProps);
String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries";
Connection conn2 = DriverManager.getConnection(url2, longRunningProps);
Expand Down Expand Up @@ -316,6 +323,7 @@ public void testUpdateCacheFrequencyWithCreateTableAndViewOnDiffConns() throws E
Properties longRunningProps = new Properties();
longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB,
QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
longRunningProps.put("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
Connection conn1 = DriverManager.getConnection(url, longRunningProps);
String url2 = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries";
Connection conn2 = DriverManager.getConnection(url2, longRunningProps);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_FUNCTION_TABLE;
import static org.apache.phoenix.query.QueryServices.DYNAMIC_JARS_DIR_KEY;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_ZK;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM;
Expand Down Expand Up @@ -289,7 +289,7 @@ public static synchronized void doSetup() throws Exception {

String clientPort = util.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
url =
JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR
JDBC_PROTOCOL_ZK + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR
+ clientPort + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
props.put(QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB, "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static synchronized void setUp() throws Exception {
hbaseTestUtil.startMiniCluster();
// establish url and quorum. Need to use PhoenixDriver and not PhoenixTestDriver
zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
url = PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;
url = PhoenixRuntime.JDBC_PROTOCOL_ZK + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum;

Properties driverProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
driverProps.put(RENEW_LEASE_THREAD_POOL_SIZE, Long.toString(4));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void testStatementClosedAfterStandby() throws Exception {
*/
@Test(timeout = 300000)
public void testNonHAConnectionNotClosedAfterFailover() throws Exception {
String firstUrl = String.format("jdbc:phoenix:%s", CLUSTERS.getUrl1());
String firstUrl = String.format("jdbc:phoenix+zk:%s", CLUSTERS.getUrl1());
// This is a vanilla Phoenix connection without using high availability (HA) feature.
Connection phoenixConn = DriverManager.getConnection(firstUrl, new Properties());
Connection failoverConn = createFailoverConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.ClusterRoleRecord.ClusterRole;
import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -611,7 +612,7 @@ public void testFallbackToSingleConnection() throws Exception {
if (CLUSTERS.getUrl1().compareTo(CLUSTERS.getUrl2()) > 0) {
firstClusterUrl = CLUSTERS.getUrl2();
}
assertEquals(firstClusterUrl, ((PhoenixConnection) conn).getURL());
assertEquals(PhoenixRuntime.JDBC_PROTOCOL_ZK + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + firstClusterUrl, ((PhoenixConnection) conn).getURL());
doTestBasicOperationsWithConnection(conn, tableName, haGroupName2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static void doSetup() throws Exception {
conf.set(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(20));
conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_POOL_SIZE, String.valueOf(5));
conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_QUEUE_SIZE, String.valueOf(30));
conf.set("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
return conf;
}

Expand All @@ -74,6 +75,7 @@ public static void doSetup() throws Exception {
conf.set(QueryServices.INTERNAL_CONNECTION_MAX_ALLOWED_CONNECTIONS, String.valueOf(20));
conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_POOL_SIZE, String.valueOf(5));
conf.set(PhoenixHAExecutorServiceProvider.HA_MAX_QUEUE_SIZE, String.valueOf(30));
conf.set("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
Configuration copy = new Configuration(conf);
copy.addResource(confToClone);
return copy;
Expand All @@ -90,9 +92,10 @@ public static void doSetup() throws Exception {
DriverManager.registerDriver(PhoenixDriver.INSTANCE);
DriverManager.registerDriver(new PhoenixTestDriver());
String profileName = "setup";
final String urlWithPrinc = url + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + profileName
+ PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
Properties props = new Properties();
final String urlWithPrinc =
ConnectionInfo.createNoLogin(url, null, props).withPrincipal("nocache")
.toUrl();

try (Connection connection = DriverManager.getConnection(urlWithPrinc, props)) {
try (Statement statement = connection.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static synchronized void setupKdc() throws Exception {
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
conf.set(User.HBASE_SECURITY_CONF_KEY, "kerberos");
conf.setBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
conf.set("hbase.client.registry.impl", ZKConnectionInfo.ZK_REGISTRY_NAME);
UserGroupInformation.setConfiguration(conf);

// Clear the cached singletons so we can inject our own.
Expand Down
Loading