Skip to content

Commit

Permalink
fix shouldrollback
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf2 committed Jan 11, 2022
1 parent 60698ac commit d355fc8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
4 changes: 3 additions & 1 deletion dtmsvr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package dtmsvr

import (
"errors"
"fmt"
"math/rand"
"runtime/debug"
"time"

"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
)
Expand All @@ -33,7 +35,7 @@ func CronTransOnce() (gid string) {
trans.WaitResult = true
branches := GetStore().FindBranches(gid)
err := trans.Process(branches)
dtmimp.E2P(err)
dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure), err)
return
}

Expand Down
30 changes: 20 additions & 10 deletions dtmsvr/trans_type_saga.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,12 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
shouldRun := func(current int) bool {
// if !csc.Concurrent,then check the branch in previous step is succeed
if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed {
if !csc.Concurrent && current >= 2 && branchResults[current-2].status != dtmcli.StatusSucceed {
return false
}
// if csc.concurrent, then check the Orders. origin one step correspond to 2 step in dtmsvr
for _, pre := range csc.Orders[current/2] {
if branches[pre*2+1].Status != dtmcli.StatusSucceed {
if branchResults[pre*2+1].status != dtmcli.StatusSucceed {
return false
}
}
Expand All @@ -107,7 +107,7 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
shouldRollback := func(current int) bool {
rollbacked := func(i int) bool {
// current compensate op rollbacked or related action still prepared
return branches[i].Status == dtmcli.StatusSucceed || branches[i+1].Status == dtmcli.StatusPrepared
return branchResults[i].status == dtmcli.StatusSucceed || branchResults[i+1].status == dtmcli.StatusPrepared
}
if rollbacked(current) {
return false
Expand Down Expand Up @@ -192,7 +192,21 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
logger.Debugf("wait once for done")
}
}

prepareToCompensate := func() {
toRunActions := pickToRunActions()
for _, b := range toRunActions {
// these branches may have run. so flag them to status succeed, then run the corresponding
// compensate
branchResults[b].status = dtmcli.StatusSucceed
}
for i, b := range branchResults {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
}
}
logger.Debugf("rsCToStart: %d branchResults: %v", rsCToStart, branchResults)
}
timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second)
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 {
toRun := pickToRunActions()
Expand All @@ -209,13 +223,9 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
if t.Status == dtmcli.StatusSubmitted && (rsAFailed > 0 || t.isTimeout()) {
t.changeStatus(dtmcli.StatusAborting)
}
for i, b := range branchResults {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
}
if t.Status == dtmcli.StatusAborting {
prepareToCompensate()
}
logger.Debugf("rsCToStart: %d", rsCToStart)
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting {
toRun := pickToRunCompensates()
runBranches(toRun)
Expand Down

0 comments on commit d355fc8

Please sign in to comment.