diff --git a/dtmsvr/cron.go b/dtmsvr/cron.go index 8ffff263b..a768fa1c2 100644 --- a/dtmsvr/cron.go +++ b/dtmsvr/cron.go @@ -7,11 +7,13 @@ package dtmsvr import ( + "errors" "fmt" "math/rand" "runtime/debug" "time" + "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" ) @@ -33,7 +35,7 @@ func CronTransOnce() (gid string) { trans.WaitResult = true branches := GetStore().FindBranches(gid) err := trans.Process(branches) - dtmimp.E2P(err) + dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure), err) return } diff --git a/dtmsvr/trans_type_saga.go b/dtmsvr/trans_type_saga.go index f2a88fd5b..15f8c6b18 100644 --- a/dtmsvr/trans_type_saga.go +++ b/dtmsvr/trans_type_saga.go @@ -47,6 +47,7 @@ func (t *transSagaProcessor) GenBranches() []TransBranch { type cSagaCustom struct { Orders map[int][]int `json:"orders"` Concurrent bool `json:"concurrent"` + cOrders map[int][]int } type branchResult struct { @@ -64,9 +65,14 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } n := len(branches) - csc := cSagaCustom{Orders: map[int][]int{}} + csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}} if t.CustomData != "" { dtmimp.MustUnmarshalString(t.CustomData, &csc) + for k, v := range csc.Orders { + for _, b := range v { + csc.cOrders[b] = append(csc.cOrders[b], k) + } + } } if csc.Concurrent || t.TimeoutToFail > 0 { // when saga is not normal, update branch sync t.updateBranchSync = true @@ -83,22 +89,41 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { rsAFailed++ } } - branchResults[i] = branchResult{status: branches[i].Status, op: branches[i].Op} + branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op} } - isPreconditionsSucceed := func(current int) bool { + shouldRun := func(current int) bool { // if !csc.Concurrent,then check the branch in previous step is succeed - if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed { + if !csc.Concurrent && current >= 2 && branchResults[current-2].status != dtmcli.StatusSucceed { return false } // if csc.concurrent, then check the Orders. origin one step correspond to 2 step in dtmsvr for _, pre := range csc.Orders[current/2] { - if branches[pre*2+1].Status != dtmcli.StatusSucceed { + if branchResults[pre*2+1].status != dtmcli.StatusSucceed { + return false + } + } + return true + } + shouldRollback := func(current int) bool { + rollbacked := func(i int) bool { + // current compensate op rollbacked or related action still prepared + return branchResults[i].status == dtmcli.StatusSucceed || branchResults[i+1].status == dtmcli.StatusPrepared + } + if rollbacked(current) { + return false + } + // if !csc.Concurrent,then check the branch in next step is rollbacked + if !csc.Concurrent && current < n-2 && !rollbacked(current+2) { + return false + } + // if csc.concurrent, then check the cOrders. origin one step correspond to 2 step in dtmsvr + for _, next := range csc.cOrders[current/2] { + if !rollbacked(2 * next) { return false } } return true } - resultChan := make(chan branchResult, n) asyncExecBranch := func(i int) { var err error @@ -115,14 +140,24 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { } pickToRunActions := func() []int { toRun := []int{} - for current := 0; current < n; current++ { + for current := 1; current < n; current += 2 { br := &branchResults[current] - if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) && - br.status == dtmcli.StatusPrepared { + if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) { toRun = append(toRun, current) } } - logger.Debugf("toRun picked for action is: %v", toRun) + logger.Debugf("toRun picked for action is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders) + return toRun + } + pickToRunCompensates := func() []int { + toRun := []int{} + for current := n - 2; current >= 0; current -= 2 { + br := &branchResults[current] + if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) { + toRun = append(toRun, current) + } + } + logger.Debugf("toRun picked for compensate is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders) return toRun } runBranches := func(toRun []int) { @@ -134,20 +169,6 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { go asyncExecBranch(b) } } - pickAndRunCompensates := func(toRunActions []int) { - for _, b := range toRunActions { - // these branches may have run. so flag them to status succeed, then run the corresponding - // compensate - branchResults[b].status = dtmcli.StatusSucceed - } - for i, b := range branchResults { - if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && - branchResults[i+1].status != dtmcli.StatusPrepared { - rsCToStart++ - go asyncExecBranch(i) - } - } - } waitDoneOnce := func() { select { case r := <-resultChan: @@ -171,8 +192,23 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { logger.Debugf("wait once for done") } } - - for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart { + prepareToCompensate := func() { + toRunActions := pickToRunActions() + for _, b := range toRunActions { + // these branches may have run. so flag them to status succeed, then run the corresponding + // compensate + branchResults[b].status = dtmcli.StatusSucceed + } + for i, b := range branchResults { + if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed && + branchResults[i+1].status != dtmcli.StatusPrepared { + rsCToStart++ + } + } + logger.Debugf("rsCToStart: %d branchResults: %v", rsCToStart, branchResults) + } + timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second) + for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 { toRun := pickToRunActions() runBranches(toRun) if rsADone == rsAStarted { // no branch is running, so break @@ -188,11 +224,16 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error { t.changeStatus(dtmcli.StatusAborting) } if t.Status == dtmcli.StatusAborting { - toRun := pickToRunActions() - pickAndRunCompensates(toRun) - for rsCDone != rsCToStart { - waitDoneOnce() + prepareToCompensate() + } + for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting { + toRun := pickToRunCompensates() + runBranches(toRun) + if rsCDone == rsCToStart { // no branch is running, so break + break } + logger.Debugf("rsCDone: %d rsCToStart: %d", rsCDone, rsCToStart) + waitDoneOnce() } if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed { t.changeStatus(dtmcli.StatusFailed) diff --git a/test/busi/busi.go b/test/busi/busi.go index 5a4119296..3fda2f246 100644 --- a/test/busi/busi.go +++ b/test/busi/busi.go @@ -24,7 +24,7 @@ func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string } else if res == dtmcli.ResultFailure { return status.New(codes.Aborted, dtmcli.ResultFailure).Err() } else if res == dtmcli.ResultOngoing { - return status.New(codes.Aborted, dtmcli.ResultOngoing).Err() + return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err() } return status.New(codes.Internal, fmt.Sprintf("unknow result %s", res)).Err() } diff --git a/test/saga_concurrent_test.go b/test/saga_concurrent_test.go index e8c32c058..5c33770f6 100644 --- a/test/saga_concurrent_test.go +++ b/test/saga_concurrent_test.go @@ -51,6 +51,15 @@ func TestSagaConRollbackOrder(t *testing.T) { assert.Equal(t, []string{StatusSucceed, StatusFailed, StatusPrepared, StatusPrepared}, getBranchesStatus(sagaCon.Gid)) } +func TestSagaConRollbackOrder2(t *testing.T) { + sagaCon := genSagaCon(dtmimp.GetFuncName(), false, true) + sagaCon.AddBranchOrder(1, []int{0}) + err := sagaCon.Submit() + assert.Nil(t, err) + waitTransProcessed(sagaCon.Gid) + assert.Equal(t, StatusFailed, getTransStatus(sagaCon.Gid)) + assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(sagaCon.Gid)) +} func TestSagaConCommittedOngoing(t *testing.T) { sagaCon := genSagaCon(dtmimp.GetFuncName(), false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) diff --git a/test/saga_test.go b/test/saga_test.go index 9b6f0e427..b7afd3a6e 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -24,6 +24,15 @@ func TestSagaNormal(t *testing.T) { assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } +func TestSagaRollback(t *testing.T) { + saga := genSaga(dtmimp.GetFuncName(), false, true) + err := saga.Submit() + assert.Nil(t, err) + waitTransProcessed(saga.Gid) + assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) + assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) +} + func TestSagaOngoingSucceed(t *testing.T) { saga := genSaga(dtmimp.GetFuncName(), false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)