Skip to content

Commit

Permalink
Pipe: Fix HA issues caused by exceptions not handled in handshake (Io…
Browse files Browse the repository at this point in the history
…TDBAirGapConnector / IoTDBSyncClientManager) (apache#14706)

Co-authored-by: Steve Yurong Su <[email protected]>
  • Loading branch information
luoluoyuyu and SteveYurongSu authored Jan 19, 2025
1 parent bc5fdae commit 670b456
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,19 @@ public Connection getConnection(
getReadConnections(null, username, password, sqlDialect));
}

@Override
public Connection getConnection(
final DataNodeWrapper dataNodeWrapper,
final String username,
final String password,
final String sqlDialect)
throws SQLException {
return new ClusterTestConnection(
getWriteConnectionWithSpecifiedDataNode(
dataNodeWrapper, null, username, password, sqlDialect),
getReadConnections(null, dataNodeWrapper, username, password, sqlDialect));
}

@Override
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
final DataNodeWrapper dataNode,
Expand Down Expand Up @@ -706,6 +719,33 @@ protected List<NodeConnection> getReadConnections(
return readConnRequestDelegate.requestAll();
}

protected List<NodeConnection> getReadConnections(
final Constant.Version version,
final DataNodeWrapper dataNode,
final String username,
final String password,
final String sqlDialect)
throws SQLException {
final List<String> endpoints = new ArrayList<>();
final ParallelRequestDelegate<NodeConnection> readConnRequestDelegate =
new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);

endpoints.add(dataNode.getIpAndPortString());
readConnRequestDelegate.addRequest(
() ->
new NodeConnection(
dataNode.getIpAndPortString(),
NodeConnection.NodeRole.DATA_NODE,
NodeConnection.ConnectionRole.READ,
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX
+ dataNode.getIpAndPortString()
+ getParam(version, NODE_NETWORK_TIMEOUT_MS, ZERO_TIME_ZONE),
BaseEnv.constructProperties(username, password, sqlDialect))));

return readConnRequestDelegate.requestAll();
}

// use this to avoid some runtimeExceptions when try to get jdbc connections.
// because it is hard to add retry and handle exception when getting jdbc connections in
// getWriteConnectionWithSpecifiedDataNode and getReadConnections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ public Connection getConnection(
return connection;
}

@Override
public Connection getConnection(
DataNodeWrapper dataNodeWrapper, String username, String password, String sqlDialect)
throws SQLException {
throw new UnsupportedOperationException();
}

public void setTestMethodName(String testCaseName) {
// Do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ Connection getConnection(
Constant.Version version, String username, String password, String sqlDialect)
throws SQLException;

Connection getConnection(
DataNodeWrapper dataNodeWrapper, String username, String password, String sqlDialect)
throws SQLException;

default Connection getConnection(String username, String password) throws SQLException {
return getConnection(username, password, TREE_SQL_DIALECT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
import static org.apache.iotdb.itbase.env.BaseEnv.TREE_SQL_DIALECT;
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -704,7 +705,7 @@ public static boolean tryExecuteNonQueriesWithRetry(BaseEnv env, List<String> sq
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
null,
BaseEnv.TREE_SQL_DIALECT);
TREE_SQL_DIALECT);
}

public static boolean tryExecuteNonQueriesWithRetry(
Expand All @@ -722,8 +723,7 @@ public static boolean tryExecuteNonQueriesWithRetry(
// Instead, it returns a flag to indicate the result of the execution.
public static boolean tryExecuteNonQueriesWithRetry(
BaseEnv env, List<String> sqlList, String userName, String password) {
return tryExecuteNonQueriesWithRetry(
env, sqlList, userName, password, null, BaseEnv.TREE_SQL_DIALECT);
return tryExecuteNonQueriesWithRetry(env, sqlList, userName, password, null, TREE_SQL_DIALECT);
}

public static boolean tryExecuteNonQueriesWithRetry(
Expand All @@ -741,7 +741,7 @@ public static boolean tryExecuteNonQueriesWithRetry(
password,
BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect)
? BaseEnv.TABLE_SQL_DIALECT
: BaseEnv.TREE_SQL_DIALECT);
: TREE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
if (BaseEnv.TABLE_SQL_DIALECT.equals(sqlDialect) && dataBase != null) {
statement.execute("use " + dataBase);
Expand Down Expand Up @@ -1053,6 +1053,41 @@ public static void assertDataEventuallyOnEnv(
}
}

public static void assertDataEventuallyOnEnv(
BaseEnv env,
DataNodeWrapper dataNodeWrapper,
String sql,
String expectedHeader,
Set<String> expectedResSet,
long timeoutSeconds) {
try (Connection connection =
env.getConnection(
dataNodeWrapper,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
TREE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try {
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Assert.fail();
}
});
} catch (Exception e) {
e.printStackTrace();
fail();
}
}

public static void assertDataSizeEventuallyOnEnv(
final BaseEnv env, final String sql, final int size, final String databaseName) {
assertDataSizeEventuallyOnEnv(env, sql, size, 600, databaseName);
Expand Down Expand Up @@ -1166,6 +1201,17 @@ public static void assertDataEventuallyOnEnv(
env, sql, expectedHeader, expectedResSet, 600, dataBaseName, handleFailure);
}

public static void assertDataEventuallyOnEnv(
final BaseEnv env,
final DataNodeWrapper dataNodeWrapper,
final String sql,
final String expectedHeader,
final Set<String> expectedResSet,
final String dataBaseName) {
assertDataEventuallyOnEnv(
env, dataNodeWrapper, sql, expectedHeader, expectedResSet, 600, dataBaseName, null);
}

public static void assertDataEventuallyOnEnv(
final BaseEnv env,
final String sql,
Expand Down Expand Up @@ -1218,6 +1264,55 @@ public static void assertDataEventuallyOnEnv(
}
}

public static void assertDataEventuallyOnEnv(
final BaseEnv env,
final DataNodeWrapper dataNodeWrapper,
final String sql,
final String expectedHeader,
final Set<String> expectedResSet,
final long timeoutSeconds,
final String databaseName,
final Consumer<String> handleFailure) {
try (Connection connection =
env.getConnection(
dataNodeWrapper,
SessionConfig.DEFAULT_USER,
SessionConfig.DEFAULT_PASSWORD,
BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
.pollInSameThread()
.pollDelay(1L, TimeUnit.SECONDS)
.pollInterval(1L, TimeUnit.SECONDS)
.atMost(timeoutSeconds, TimeUnit.SECONDS)
.untilAsserted(
() -> {
try {
if (databaseName != null) {
statement.execute("use " + databaseName);
}
if (sql != null && !sql.isEmpty()) {
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
}
} catch (Exception e) {
if (handleFailure != null) {
handleFailure.accept(e.getMessage());
}
Assert.fail();
} catch (Error e) {
if (handleFailure != null) {
handleFailure.accept(e.getMessage());
}
throw e;
}
});
} catch (Exception e) {
fail(e.getMessage());
}
}

public static void assertDataEventuallyOnEnv(
BaseEnv env, String sql, Map<String, String> expectedHeaderWithResult) {
assertDataEventuallyOnEnv(env, sql, expectedHeaderWithResult, 600);
Expand Down Expand Up @@ -1294,7 +1389,7 @@ public static void assertDataAlwaysOnEnv(
final boolean[] flushed = {false};
try (Connection connection =
env.getConnection(
Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT : BaseEnv.TABLE_SQL_DIALECT);
Objects.isNull(database) ? TREE_SQL_DIALECT : BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.pipe.it.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
Expand Down Expand Up @@ -80,6 +81,8 @@ public void setUp() {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
.setDataReplicationFactor(2)
.setSchemaReplicationFactor(3)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
Expand All @@ -92,6 +95,101 @@ public void setUp() {
receiverEnv.initClusterEnvironment(3, 3, 180);
}

@Test
public void testMachineDowntimeAsync() {
testMachineDowntime("iotdb-thrift-connector");
}

@Test
public void testMachineDowntimeSync() {
testMachineDowntime("iotdb-thrift-sync-connector");
}

private void testMachineDowntime(String sink) {
StringBuilder a = new StringBuilder();
for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
a.append(nodeWrapper.getIp()).append(":").append(nodeWrapper.getPort());
a.append(",");
}
a.deleteCharAt(a.length() - 1);

TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", 0, 1, senderEnv);
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s1) values (2010-01-01T10:00:00+08:00, 1)",
"insert into root.db.d1(time, s1) values (2010-01-02T10:00:00+08:00, 2)",
"flush"))) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor", "iotdb-extractor");
extractorAttributes.put("capture.tree", "true");

processorAttributes.put("processor", "do-nothing-processor");

connectorAttributes.put("connector", sink);
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.node-urls", a.toString());

final TSStatus status =
client.createPipe(
new TCreatePipeReq("p1", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

receiverEnv.getDataNodeWrapper(0).stop();

// Ensure that the kill -9 operation is completed
Thread.sleep(5000);
for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
if (!nodeWrapper.isAlive()) {
continue;
}
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
nodeWrapper,
"select count(*) from root.**",
"count(root.db.d1.s1),",
Collections.singleton("2,"),
600);
}
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList("insert into root.db.d1(time, s1) values (now(), 3)", "flush"))) {
return;
}

} catch (Exception e) {
fail(e.getMessage());
}

for (DataNodeWrapper nodeWrapper : receiverEnv.getDataNodeWrapperList()) {
if (!nodeWrapper.isAlive()) {
continue;
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
nodeWrapper,
"select count(*) from root.**",
"count(root.db.d1.s1),",
Collections.singleton("3,"),
600);
return;
}
}

@Test
public void testWithAllParametersInLogMode() throws Exception {
testWithAllParameters("log");
Expand Down
Loading

0 comments on commit 670b456

Please sign in to comment.