Skip to content

Commit

Permalink
Merge pull request dtm-labs#175 from dtm-labs/alpha
Browse files Browse the repository at this point in the history
saga rollback order keeped
  • Loading branch information
yedf2 authored Jan 11, 2022
2 parents eb13952 + d355fc8 commit 5daca6d
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 32 deletions.
4 changes: 3 additions & 1 deletion dtmsvr/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
package dtmsvr

import (
"errors"
"fmt"
"math/rand"
"runtime/debug"
"time"

"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
)
Expand All @@ -33,7 +35,7 @@ func CronTransOnce() (gid string) {
trans.WaitResult = true
branches := GetStore().FindBranches(gid)
err := trans.Process(branches)
dtmimp.E2P(err)
dtmimp.PanicIf(err != nil && !errors.Is(err, dtmcli.ErrFailure), err)
return
}

Expand Down
101 changes: 71 additions & 30 deletions dtmsvr/trans_type_saga.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (t *transSagaProcessor) GenBranches() []TransBranch {
type cSagaCustom struct {
Orders map[int][]int `json:"orders"`
Concurrent bool `json:"concurrent"`
cOrders map[int][]int
}

type branchResult struct {
Expand All @@ -64,9 +65,14 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
n := len(branches)

csc := cSagaCustom{Orders: map[int][]int{}}
csc := cSagaCustom{Orders: map[int][]int{}, cOrders: map[int][]int{}}
if t.CustomData != "" {
dtmimp.MustUnmarshalString(t.CustomData, &csc)
for k, v := range csc.Orders {
for _, b := range v {
csc.cOrders[b] = append(csc.cOrders[b], k)
}
}
}
if csc.Concurrent || t.TimeoutToFail > 0 { // when saga is not normal, update branch sync
t.updateBranchSync = true
Expand All @@ -83,22 +89,41 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
rsAFailed++
}
}
branchResults[i] = branchResult{status: branches[i].Status, op: branches[i].Op}
branchResults[i] = branchResult{index: i, status: branches[i].Status, op: branches[i].Op}
}
isPreconditionsSucceed := func(current int) bool {
shouldRun := func(current int) bool {
// if !csc.Concurrent,then check the branch in previous step is succeed
if !csc.Concurrent && current >= 2 && branches[current-2].Status != dtmcli.StatusSucceed {
if !csc.Concurrent && current >= 2 && branchResults[current-2].status != dtmcli.StatusSucceed {
return false
}
// if csc.concurrent, then check the Orders. origin one step correspond to 2 step in dtmsvr
for _, pre := range csc.Orders[current/2] {
if branches[pre*2+1].Status != dtmcli.StatusSucceed {
if branchResults[pre*2+1].status != dtmcli.StatusSucceed {
return false
}
}
return true
}
shouldRollback := func(current int) bool {
rollbacked := func(i int) bool {
// current compensate op rollbacked or related action still prepared
return branchResults[i].status == dtmcli.StatusSucceed || branchResults[i+1].status == dtmcli.StatusPrepared
}
if rollbacked(current) {
return false
}
// if !csc.Concurrent,then check the branch in next step is rollbacked
if !csc.Concurrent && current < n-2 && !rollbacked(current+2) {
return false
}
// if csc.concurrent, then check the cOrders. origin one step correspond to 2 step in dtmsvr
for _, next := range csc.cOrders[current/2] {
if !rollbacked(2 * next) {
return false
}
}
return true
}

resultChan := make(chan branchResult, n)
asyncExecBranch := func(i int) {
var err error
Expand All @@ -115,14 +140,24 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
}
pickToRunActions := func() []int {
toRun := []int{}
for current := 0; current < n; current++ {
for current := 1; current < n; current += 2 {
br := &branchResults[current]
if br.op == dtmcli.BranchAction && !br.started && isPreconditionsSucceed(current) &&
br.status == dtmcli.StatusPrepared {
if !br.started && br.status == dtmcli.StatusPrepared && shouldRun(current) {
toRun = append(toRun, current)
}
}
logger.Debugf("toRun picked for action is: %v", toRun)
logger.Debugf("toRun picked for action is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders)
return toRun
}
pickToRunCompensates := func() []int {
toRun := []int{}
for current := n - 2; current >= 0; current -= 2 {
br := &branchResults[current]
if !br.started && br.status == dtmcli.StatusPrepared && shouldRollback(current) {
toRun = append(toRun, current)
}
}
logger.Debugf("toRun picked for compensate is: %v branchResults: %v compensate orders: %v", toRun, branchResults, csc.cOrders)
return toRun
}
runBranches := func(toRun []int) {
Expand All @@ -134,20 +169,6 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
go asyncExecBranch(b)
}
}
pickAndRunCompensates := func(toRunActions []int) {
for _, b := range toRunActions {
// these branches may have run. so flag them to status succeed, then run the corresponding
// compensate
branchResults[b].status = dtmcli.StatusSucceed
}
for i, b := range branchResults {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
go asyncExecBranch(i)
}
}
}
waitDoneOnce := func() {
select {
case r := <-resultChan:
Expand All @@ -171,8 +192,23 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
logger.Debugf("wait once for done")
}
}

for t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 && rsADone != rsAToStart {
prepareToCompensate := func() {
toRunActions := pickToRunActions()
for _, b := range toRunActions {
// these branches may have run. so flag them to status succeed, then run the corresponding
// compensate
branchResults[b].status = dtmcli.StatusSucceed
}
for i, b := range branchResults {
if b.op == dtmcli.BranchCompensate && b.status != dtmcli.StatusSucceed &&
branchResults[i+1].status != dtmcli.StatusPrepared {
rsCToStart++
}
}
logger.Debugf("rsCToStart: %d branchResults: %v", rsCToStart, branchResults)
}
timeLimit := time.Now().Add(time.Duration(conf.RequestTimeout+2) * time.Second)
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusSubmitted && !t.isTimeout() && rsAFailed == 0 {
toRun := pickToRunActions()
runBranches(toRun)
if rsADone == rsAStarted { // no branch is running, so break
Expand All @@ -188,11 +224,16 @@ func (t *transSagaProcessor) ProcessOnce(branches []TransBranch) error {
t.changeStatus(dtmcli.StatusAborting)
}
if t.Status == dtmcli.StatusAborting {
toRun := pickToRunActions()
pickAndRunCompensates(toRun)
for rsCDone != rsCToStart {
waitDoneOnce()
prepareToCompensate()
}
for time.Now().Before(timeLimit) && t.Status == dtmcli.StatusAborting {
toRun := pickToRunCompensates()
runBranches(toRun)
if rsCDone == rsCToStart { // no branch is running, so break
break
}
logger.Debugf("rsCDone: %d rsCToStart: %d", rsCDone, rsCToStart)
waitDoneOnce()
}
if t.Status == dtmcli.StatusAborting && rsCToStart == rsCSucceed {
t.changeStatus(dtmcli.StatusFailed)
Expand Down
2 changes: 1 addition & 1 deletion test/busi/busi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string
} else if res == dtmcli.ResultFailure {
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
} else if res == dtmcli.ResultOngoing {
return status.New(codes.Aborted, dtmcli.ResultOngoing).Err()
return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err()
}
return status.New(codes.Internal, fmt.Sprintf("unknow result %s", res)).Err()
}
Expand Down
9 changes: 9 additions & 0 deletions test/saga_concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ func TestSagaConRollbackOrder(t *testing.T) {
assert.Equal(t, []string{StatusSucceed, StatusFailed, StatusPrepared, StatusPrepared}, getBranchesStatus(sagaCon.Gid))
}

func TestSagaConRollbackOrder2(t *testing.T) {
sagaCon := genSagaCon(dtmimp.GetFuncName(), false, true)
sagaCon.AddBranchOrder(1, []int{0})
err := sagaCon.Submit()
assert.Nil(t, err)
waitTransProcessed(sagaCon.Gid)
assert.Equal(t, StatusFailed, getTransStatus(sagaCon.Gid))
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(sagaCon.Gid))
}
func TestSagaConCommittedOngoing(t *testing.T) {
sagaCon := genSagaCon(dtmimp.GetFuncName(), false, false)
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
Expand Down
9 changes: 9 additions & 0 deletions test/saga_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ func TestSagaNormal(t *testing.T) {
assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid))
}

func TestSagaRollback(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, true)
err := saga.Submit()
assert.Nil(t, err)
waitTransProcessed(saga.Gid)
assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid))
assert.Equal(t, StatusFailed, getTransStatus(saga.Gid))
}

func TestSagaOngoingSucceed(t *testing.T) {
saga := genSaga(dtmimp.GetFuncName(), false, false)
busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing)
Expand Down

0 comments on commit 5daca6d

Please sign in to comment.