Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](cloud) Fix cloud -230 retry not reset ctx state #47326

Merged
merged 4 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1275,15 +1275,15 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,

AbortTxnResponse abortTxnResponse = null;
try {
abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null, null);
abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null);
} finally {
handleAfterAbort(abortTxnResponse, txnCommitAttachment, transactionId);
}
}

private AbortTxnResponse abortTransactionImpl(Long dbId, Long transactionId, String reason,
TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException {
LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId);
TxnCommitAttachment txnCommitAttachment) throws UserException {
LOG.info("try to abort transaction, dbId:{}, transactionId:{}, reason: {}", dbId, transactionId, reason);

AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder();
builder.setDbId(dbId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ public void queryRetry(TUniqueId queryId) throws Exception {
i, DebugUtil.printId(firstQueryId), DebugUtil.printId(lastQueryId),
DebugUtil.printId(queryId), randomMillis);
Thread.sleep(randomMillis);
context.getState().reset();
} catch (Exception e) {
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import com.mysql.cj.jdbc.StatementImpl
import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.NodeType
import org.apache.doris.regression.suite.SuiteCluster
Expand All @@ -30,6 +31,16 @@ suite("test_retry_e-230", 'docker') {
options.feConfigs.add('sys_log_verbose_modules=org')
options.setBeNum(1)
options.cloudMode = true

def insert_sql = { sql, expected_row_count ->
def stmt = prepareStatement """ ${sql} """
def result = stmt.executeUpdate()
logger.info("insert result: " + result)
def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
logger.info("result server info: " + serverInfo)
assertEquals(result, expected_row_count)
assertTrue(serverInfo.contains("'status':'VISIBLE'"))
}
// 1. connect to master
options.connectToFollower = false
for (def j = 0; j < 2; j++) {
Expand Down Expand Up @@ -57,7 +68,7 @@ suite("test_retry_e-230", 'docker') {
);
"""
for (def i = 1; i <= 5; i++) {
sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
insert_sql """INSERT INTO ${tbl} VALUES (${i}, ${10 * i})""", 1
}

cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])
Expand Down Expand Up @@ -112,36 +123,48 @@ suite("test_retry_e-230", 'docker') {
)
"""

sql """
insert into ${tbl1} values (9,10,11,12), (1,2,3,4)
"""
insert_sql """INSERT INTO ${tbl1} VALUES (9,10,11,12), (1,2,3,4)""", 2

// dp again
cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])

cluster.clearFrontendDebugPoints()
try {
sql """insert into ${tbl2} select * from ${tbl1}"""
assertFalse(true)
} catch (Exception e) {
logger.info("Received expected exception when insert into select: {}", e.getMessage())
assert e.getMessage().contains("[E-230]injected error"), "Unexpected exception message when insert into select"
}

cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])

def futrue3 = thread {
Thread.sleep(4000)
cluster.clearBackendDebugPoints()
}

begin = System.currentTimeMillis();
def futrue4 = thread {
def result = try_sql """insert into ${tbl2} select * from ${tbl1}"""
insert_sql """insert into ${tbl2} select * from ${tbl1}""", 2
}

futrue4.get()
cost = System.currentTimeMillis() - begin;
log.info("time cost insert into select : {}", cost)
futrue3.get()
def tbl1Ret = sql_return_maparray """select * from ${tbl1}"""
log.info("tbl1 ret {}", tbl1Ret)
def tbl2Ret = sql_return_maparray """select * from ${tbl2}"""
log.info("tbl2 ret {}", tbl2Ret)
// Compare the results from both tables
assertEquals(tbl1Ret, tbl2Ret, "Data in ${tbl1} and ${tbl2} should be identical")
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 4000 && cost < 100000)

} finally {
cluster.clearFrontendDebugPoints()
cluster.clearBackendDebugPoints()
sql """ DROP TABLE IF EXISTS ${tbl} """
sql """ DROP TABLE IF EXISTS ${tbl1} """
sql """ DROP TABLE IF EXISTS ${tbl2} """
}
}
// 2. connect to follower
Expand Down
Loading