diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index 114dbeaa4e58a0..1c63b1d37d6698 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -1275,15 +1275,15 @@ public void abortTransaction(Long dbId, Long transactionId, String reason, AbortTxnResponse abortTxnResponse = null; try { - abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null, null); + abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null); } finally { handleAfterAbort(abortTxnResponse, txnCommitAttachment, transactionId); } } private AbortTxnResponse abortTransactionImpl(Long dbId, Long transactionId, String reason, - TxnCommitAttachment txnCommitAttachment, List tableList) throws UserException { - LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId); + TxnCommitAttachment txnCommitAttachment) throws UserException { + LOG.info("try to abort transaction, dbId:{}, transactionId:{}, reason: {}", dbId, transactionId, reason); AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder(); builder.setDbId(dbId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8d712dda76a08b..ae7dda14609395 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -593,6 +593,7 @@ public void queryRetry(TUniqueId queryId) throws Exception { i, DebugUtil.printId(firstQueryId), DebugUtil.printId(lastQueryId), DebugUtil.printId(queryId), randomMillis); Thread.sleep(randomMillis); + context.getState().reset(); } catch (Exception e) { throw e; } diff --git a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy index 88ec8e8861d6f4..b815a28dc637a7 100644 --- a/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy +++ b/regression-test/suites/cloud_p0/query_retry/test_retry_e-230.groovy @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. +import com.mysql.cj.jdbc.StatementImpl import org.apache.doris.regression.suite.ClusterOptions import org.apache.doris.regression.util.NodeType import org.apache.doris.regression.suite.SuiteCluster @@ -30,6 +31,16 @@ suite("test_retry_e-230", 'docker') { options.feConfigs.add('sys_log_verbose_modules=org') options.setBeNum(1) options.cloudMode = true + + def insert_sql = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'VISIBLE'")) + } // 1. connect to master options.connectToFollower = false for (def j = 0; j < 2; j++) { @@ -57,7 +68,7 @@ suite("test_retry_e-230", 'docker') { ); """ for (def i = 1; i <= 5; i++) { - sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" + insert_sql """INSERT INTO ${tbl} VALUES (${i}, ${10 * i})""", 1 } cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null]) @@ -112,13 +123,22 @@ suite("test_retry_e-230", 'docker') { ) """ - sql """ - insert into ${tbl1} values (9,10,11,12), (1,2,3,4) - """ + insert_sql """INSERT INTO ${tbl1} VALUES (9,10,11,12), (1,2,3,4)""", 2 // dp again cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null]) + cluster.clearFrontendDebugPoints() + try { + sql """insert into ${tbl2} select * from ${tbl1}""" + assertFalse(true) + } catch (Exception e) { + logger.info("Received expected exception when insert into select: {}", e.getMessage()) + assert e.getMessage().contains("[E-230]injected error"), "Unexpected exception message when insert into select" + } + + cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null]) + def futrue3 = thread { Thread.sleep(4000) cluster.clearBackendDebugPoints() @@ -126,22 +146,25 @@ suite("test_retry_e-230", 'docker') { begin = System.currentTimeMillis(); def futrue4 = thread { - def result = try_sql """insert into ${tbl2} select * from ${tbl1}""" + insert_sql """insert into ${tbl2} select * from ${tbl1}""", 2 } futrue4.get() cost = System.currentTimeMillis() - begin; log.info("time cost insert into select : {}", cost) futrue3.get() + def tbl1Ret = sql_return_maparray """select * from ${tbl1}""" + log.info("tbl1 ret {}", tbl1Ret) + def tbl2Ret = sql_return_maparray """select * from ${tbl2}""" + log.info("tbl2 ret {}", tbl2Ret) + // Compare the results from both tables + assertEquals(tbl1Ret, tbl2Ret, "Data in ${tbl1} and ${tbl2} should be identical") // fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s assertTrue(cost > 4000 && cost < 100000) } finally { cluster.clearFrontendDebugPoints() cluster.clearBackendDebugPoints() - sql """ DROP TABLE IF EXISTS ${tbl} """ - sql """ DROP TABLE IF EXISTS ${tbl1} """ - sql """ DROP TABLE IF EXISTS ${tbl2} """ } } // 2. connect to follower