From 259d09654388bd70cb7a2bc5a5b6e7b6465c1b45 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 11 Jan 2022 17:35:14 +0800 Subject: [PATCH 1/3] saga rollback order keeped --- dtmsvr/trans_type_saga.go | 87 ++++++++++++++++++++++++++------------- test/busi/busi.go | 2 +- test/saga_test.go | 9 ++++ 3 files changed, 69 insertions(+), 29 deletions(-) diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index f2a88fd5b..b7d15ca88 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -47,6 +47,7 @@ func (t *transSagaProcessor) GenBranches() []TransBranch { type cSagaCustom struct { Orders map[int][]int `json:"orders"` Concurrent bool `json:"concurrent"` + cOrders map[int][]int } type branchResult struct { @@ -64,9 +65,14 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } n := len(branches) - csc := cSagaCustom{Orders: map[int][]int{}} + csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}} if t.CustomData != "" { dtmimp.MustUnmarshalString(t.CustomData, &csc) + for k, v := range csc.Orders { + for _, b := range v { + csc.cOrders[b] = append(csc.cOrders[b], k) + } + } } if csc.Concurrent || t.TimeoutToFail > 0 { // when saga is not normal, update branch sync t.updateBranchSync = true @@ -83,9 +89,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { rsAFailed++ } } - branchResults[i] = branchResult{status: branches[i].Status, op: branches[i].Op} + branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op} } - isPreconditionsSucceed := func(current int) bool { + shouldRun := func(current int) bool { // if !csc.Concurrent,then check the branch in previous step is succeed if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { return false @@ -98,7 +104,26 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } return true } - + shouldRollback := func(current int) bool { + rollbacked := func(i int) bool { + // current compensate op rollbacked or related action still prepared + return branches[i].Status == dtmcli.StatusSucceed || branches[i+1].Status == dtmcli.StatusPrepared + } + if rollbacked(current) { + return false + } + // if !csc.Concurrent,then check the branch in next step is rollbacked + if !csc.Concurrent && current < n-2 && !rollbacked(current+2) { + return false + } + // if csc.concurrent, then check the cOrders. origin one step correspond to 2 step in dtmsvr + for _, next := range csc.cOrders[current/2] { + if !rollbacked(next) { + return false + } + } + return true + } resultChan := make(chan branchResult, n) asyncExecBranch := func(i int) { var err error @@ -115,14 +140,24 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } pickToRunActions := func() []int { toRun := []int{} - for current := 0; current < n; current++ { + for current := 1; current < n; current += 2 { + br := &branchResults[current] + if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) { + toRun = append(toRun, current) + } + } + logger.Debugf("toRun picked for action is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders) + return toRun + } + pickToRunCompensates := func() []int { + toRun := []int{} + for current := n - 2; current >= 0; current -= 2 { br := &branchResults[current] - if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && - br.status == dtmcli.StatusPrepared { + if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) { toRun = append(toRun, current) } } - logger.Debugf("toRun picked for action is: %v", toRun) + logger.Debugf("toRun picked for compensate is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders) return toRun } runBranches := func(toRun []int) { @@ -134,20 +169,6 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { go asyncExecBranch(b) } } - pickAndRunCompensates := func(toRunActions []int) { - for _, b := range toRunActions { - // these branches may have run. so flag them to status succeed, then run the corresponding - // compensate - branchResults[b].status = dtmcli.StatusSucceed - } - for i, b := range branchResults { - if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && - branchResults[i+1].status != dtmcli.StatusPrepared { - rsCToStart++ - go asyncExecBranch(i) - } - } - } waitDoneOnce := func() { select { case r := <-resultChan: @@ -172,7 +193,8 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } } - for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart { + timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second) + for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 { toRun := pickToRunActions() runBranches(toRun) if rsADone == rsAStarted { // no branch is running, so break @@ -187,13 +209,22 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) { t.changeStatus(dtmcli.StatusAborting) } - if t.Status == dtmcli.StatusAborting { - toRun := pickToRunActions() - pickAndRunCompensates(toRun) - for rsCDone != rsCToStart { - waitDoneOnce() + for i, b := range branchResults { + if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && + branchResults[i+1].status != dtmcli.StatusPrepared { + rsCToStart++ } } + logger.Debugf("rsCToStart: %d", rsCToStart) + for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting { + toRun := pickToRunCompensates() + runBranches(toRun) + if rsCDone == rsCToStart { // no branch is running, so break + break + } + logger.Debugf("rsCDone: %d rsCToStart: %d", rsCDone, rsCToStart) + waitDoneOnce() + } if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed { t.changeStatus(dtmcli.StatusFailed) } diff --git a/test/busi/busi.go b/test/busi/busi.go index 5a4119296..3fda2f246 100644 --- a/test/busi/busi.go +++ b/test/busi/busi.go @@ -24,7 +24,7 @@ func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string } else if res == dtmcli.ResultFailure { return status.New(codes.Aborted, dtmcli.ResultFailure).Err() } else if res == dtmcli.ResultOngoing { - return status.New(codes.Aborted, dtmcli.ResultOngoing).Err() + return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err() } return status.New(codes.Internal, fmt.Sprintf("unknow result %s", res)).Err() } diff --git a/test/saga_test.go b/test/saga_test.go index 9b6f0e427..b7afd3a6e 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -24,6 +24,15 @@ func TestSagaNormal(t *testing.T) { assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } +func TestSagaRollback(t *testing.T) { + saga := genSaga(dtmimp.GetFuncName(), false, true) + err := saga.Submit() + assert.Nil(t, err) + waitTransProcessed(saga.Gid) + assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) +} + func TestSagaOngoingSucceed(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) From 60698ac807719aa8087176197df39898153b23e1 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 11 Jan 2022 18:25:51 +0800 Subject: [PATCH 2/3] fix cOrders test in shouldRollback --- dtmsvr/trans_type_saga.go | 2 +- test/saga_concurrent_test.go | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index b7d15ca88..7f439340b 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -118,7 +118,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } // if csc.concurrent, then check the cOrders. origin one step correspond to 2 step in dtmsvr for _, next := range csc.cOrders[current/2] { - if !rollbacked(next) { + if !rollbacked(2 * next) { return false } } diff --git a/test/saga_concurrent_test.go b/test/saga_concurrent_test.go index e8c32c058..5c33770f6 100644 --- a/test/saga_concurrent_test.go +++ b/test/saga_concurrent_test.go @@ -51,6 +51,15 @@ func TestSagaConRollbackOrder(t *testing.T) { assert.Equal(t, []string{StatusSucceed, StatusFailed, StatusPrepared, StatusPrepared}, getBranchesStatus(sagaCon.Gid)) } +func TestSagaConRollbackOrder2(t *testing.T) { + sagaCon := genSagaCon(dtmimp.GetFuncName(), false, true) + sagaCon.AddBranchOrder(1, []int{0}) + err := sagaCon.Submit() + assert.Nil(t, err) + waitTransProcessed(sagaCon.Gid) + assert.Equal(t, StatusFailed, getTransStatus(sagaCon.Gid)) + assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(sagaCon.Gid)) +} func TestSagaConCommittedOngoing(t *testing.T) { sagaCon := genSagaCon(dtmimp.GetFuncName(), false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) From d355fc80fb46e40d033b6e88ba29296bd88d0e13 Mon Sep 17 00:00:00 2001 From: yedf2 <120050102@qq.com> Date: Tue, 11 Jan 2022 20:00:20 +0800 Subject: [PATCH 3/3] fix shouldrollback --- dtmsvr/cron.go | 4 +++- dtmsvr/trans_type_saga.go | 30 ++++++++++++++++++++---------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 8ffff263b..a768fa1c2 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -7,11 +7,13 @@ package dtmsvr import ( + "errors" "fmt" "math/rand" "runtime/debug" "time" + "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" ) @@ -33,7 +35,7 @@ func CronTransOnce() (gid string) { trans.WaitResult = true branches := GetStore().FindBranches(gid) err := trans.Process(branches) - dtmimp.E2P(err) + dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure), err) return } diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index 7f439340b..15f8c6b18 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -93,12 +93,12 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } shouldRun := func(current int) bool { // if !csc.Concurrent,then check the branch in previous step is succeed - if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { + if !csc.Concurrent && current >= 2 && branchResults[current-2].status != dtmcli.StatusSucceed { return false } // if csc.concurrent, then check the Orders. origin one step correspond to 2 step in dtmsvr for _, pre := range csc.Orders[current/2] { - if branches[pre*2+1].Status != dtmcli.StatusSucceed { + if branchResults[pre*2+1].status != dtmcli.StatusSucceed { return false } } @@ -107,7 +107,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { shouldRollback := func(current int) bool { rollbacked := func(i int) bool { // current compensate op rollbacked or related action still prepared - return branches[i].Status == dtmcli.StatusSucceed || branches[i+1].Status == dtmcli.StatusPrepared + return branchResults[i].status == dtmcli.StatusSucceed || branchResults[i+1].status == dtmcli.StatusPrepared } if rollbacked(current) { return false @@ -192,7 +192,21 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { logger.Debugf("wait once for done") } } - + prepareToCompensate := func() { + toRunActions := pickToRunActions() + for _, b := range toRunActions { + // these branches may have run. so flag them to status succeed, then run the corresponding + // compensate + branchResults[b].status = dtmcli.StatusSucceed + } + for i, b := range branchResults { + if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && + branchResults[i+1].status != dtmcli.StatusPrepared { + rsCToStart++ + } + } + logger.Debugf("rsCToStart: %d branchResults: %v", rsCToStart, branchResults) + } timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second) for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 { toRun := pickToRunActions() @@ -209,13 +223,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) { t.changeStatus(dtmcli.StatusAborting) } - for i, b := range branchResults { - if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && - branchResults[i+1].status != dtmcli.StatusPrepared { - rsCToStart++ - } + if t.Status == dtmcli.StatusAborting { + prepareToCompensate() } - logger.Debugf("rsCToStart: %d", rsCToStart) for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting { toRun := pickToRunCompensates() runBranches(toRun)