diff --git a/.gitignore b/.gitignore index 34e44f002..07212df10 100644 --- a/.gitignore +++ b/.gitignore @@ -8,7 +8,7 @@ dist .vscode default.etcd */**/*.bolt - +bench/bench # Output file of unit test coverage coverage.* profile.* diff --git a/.golangci.yml b/.golangci.yml index 58479dcab..31cf246a9 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,7 +1,7 @@ run: deadline: 5m skip-dirs: - - test +# - test # - bench linter-settings: diff --git a/bench/main.go b/bench/main.go index 34bd8595a..f5793f94b 100644 --- a/bench/main.go +++ b/bench/main.go @@ -34,13 +34,13 @@ func main() { logger.Infof("starting bench server") config.MustLoadConfig("") logger.InitLog(conf.LogLevel) - if busi.BusiConf.Driver != "" { - dtmcli.SetCurrentDBType(busi.BusiConf.Driver) - svr.PrepareBenchDB() - } registry.WaitStoreUp() dtmsvr.PopulateDB(false) if os.Args[1] == "db" { + if busi.BusiConf.Driver == "mysql" { + dtmcli.SetCurrentDBType(busi.BusiConf.Driver) + svr.PrepareBenchDB() + } busi.PopulateDB(false) } else if os.Args[1] == "redis" || os.Args[1] == "boltdb" { diff --git a/bench/svr/http.go b/bench/svr/http.go index 6e8d7bc63..1f827d863 100644 --- a/bench/svr/http.go +++ b/bench/svr/http.go @@ -7,6 +7,7 @@ package svr import ( + "context" "database/sql" "fmt" "os" @@ -132,6 +133,8 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) error { // nolint: unp return nil } +var stockKey = "{a}--stock-1" + func benchAddRoute(app *gin.Engine) { app.POST(benchAPI+"/TransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 1, c) @@ -186,4 +189,19 @@ func benchAddRoute(app *gin.Engine) { saga.WaitResult = true return saga.Submit() })) + app.Any(benchAPI+"/benchFlashSalesReset", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + _, err := busi.RedisGet().FlushAll(context.Background()).Result() + logger.FatalIfError(err) + _, err = busi.RedisGet().Set(context.Background(), stockKey, "0", 86400*time.Second).Result() + logger.FatalIfError(err) + return nil + })) + app.Any(benchAPI+"/benchFlashSales", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + gid := "{a}-" + shortuuid.New() + msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid). + Add("", nil) + return msg.DoAndSubmit("", func(bb *dtmcli.BranchBarrier) error { + return bb.RedisCheckAdjustAmount(busi.RedisGet(), stockKey, -1, 86400) + }) + })) } diff --git a/bench/test-flash-sales.sh b/bench/test-flash-sales.sh new file mode 100755 index 000000000..b108040e6 --- /dev/null +++ b/bench/test-flash-sales.sh @@ -0,0 +1,14 @@ +# !/bin/bash + +set -x + +export LOG_LEVEL=fatal +export STORE_DRIVER=redis +export STORE_HOST=localhost +export STORE_PORT=6379 +export BUSI_REDIS=localhost:6379 +./bench redis & +echo 'sleeping 3s for dtm bench to run up.' && sleep 3 +curl "http://127.0.0.1:8083/api/busi_bench/benchFlashSalesReset" +ab -n 300000 -c 20 "http://127.0.0.1:8083/api/busi_bench/benchFlashSales" +pkill bench diff --git a/dtmcli/barrier.go b/dtmcli/barrier.go index c2b66d1e9..497ab630a 100644 --- a/dtmcli/barrier.go +++ b/dtmcli/barrier.go @@ -13,7 +13,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" - "github.com/go-redis/redis/v8" ) // BarrierBusiFunc type for busi func @@ -115,44 +114,3 @@ func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error { } return err } - -// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount -func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error { - bkey1 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, bb.Op, bb.BarrierID) - originOp := map[string]string{ - BranchCancel: BranchTry, - BranchCompensate: BranchAction, - }[bb.Op] - bkey2 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, originOp, bb.BarrierID) - v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount -local v = redis.call('GET', KEYS[1]) -local e1 = redis.call('GET', KEYS[2]) - -if v == false or v + ARGV[1] < 0 then - return 'FAILURE' -end - -if e1 ~= false then - return -end - -redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3]) - -if ARGV[2] ~= '' then - local e2 = redis.call('GET', KEYS[3]) - if e2 == false then - redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3]) - return - end -end -redis.call('INCRBY', KEYS[1], ARGV[1]) -`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result() - logger.Debugf("lua return v: %v err: %v", v, err) - if err == redis.Nil { - err = nil - } - if err == nil && v == ResultFailure { - err = ErrFailure - } - return err -} diff --git a/dtmcli/barrier_redis.go b/dtmcli/barrier_redis.go new file mode 100644 index 000000000..9babfdfaa --- /dev/null +++ b/dtmcli/barrier_redis.go @@ -0,0 +1,73 @@ +package dtmcli + +import ( + "fmt" + + "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/go-redis/redis/v8" +) + +// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount +func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error { + bb.BarrierID = bb.BarrierID + 1 + bkey1 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, bb.Op, bb.BarrierID) + originOp := map[string]string{ + BranchCancel: BranchTry, + BranchCompensate: BranchAction, + }[bb.Op] + bkey2 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, originOp, bb.BarrierID) + v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount +local v = redis.call('GET', KEYS[1]) +local e1 = redis.call('GET', KEYS[2]) + +if v == false or v + ARGV[1] < 0 then + return 'FAILURE' +end + +if e1 ~= false then + return +end + +redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3]) + +if ARGV[2] ~= '' then + local e2 = redis.call('GET', KEYS[3]) + if e2 == false then + redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3]) + return + end +end +redis.call('INCRBY', KEYS[1], ARGV[1]) +`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result() + logger.Debugf("lua return v: %v err: %v", v, err) + if err == redis.Nil { + err = nil + } + if err == nil && v == ResultFailure { + err = ErrFailure + } + return err +} + +// RedisQueryPrepared query prepared for redis +func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error { + bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, "00", "msg", "01") + v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared +local v = redis.call('GET', KEYS[1]) +if v == false then + redis.call('SET', KEYS[1], 'rollback', 'EX', ARGV[1]) + v = 'rollback' +end +if v == 'rollback' then + return 'FAILURE' +end +`, []string{bkey1}, barrierExpire).Result() + logger.Debugf("lua return v: %v err: %v", v, err) + if err == redis.Nil { + err = nil + } + if err == nil && v == ResultFailure { + err = ErrFailure + } + return err +} diff --git a/dtmcli/dtmimp/trans_base.go b/dtmcli/dtmimp/trans_base.go index ad0b9d1b7..eb0967d2b 100644 --- a/dtmcli/dtmimp/trans_base.go +++ b/dtmcli/dtmimp/trans_base.go @@ -106,8 +106,8 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin return TransCallDtm(tb, m, operation) } -// TransRequestBranch TransBAse request branch result -func TransRequestBranch(t *TransBase, body interface{}, branchID string, op string, url string) (*resty.Response, error) { +// TransRequestBranch TransBase request branch result +func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) { resp, err := RestyClient.R(). SetBody(body). SetQueryParams(map[string]string{ @@ -118,7 +118,7 @@ func TransRequestBranch(t *TransBase, body interface{}, branchID string, op stri "op": op, }). SetHeaders(t.BranchHeaders). - Post(url) + Execute(method, url) if err == nil { err = RespAsErrorCompatible(resp) } diff --git a/dtmcli/msg.go b/dtmcli/msg.go index 6fba8059d..f590df4ef 100644 --- a/dtmcli/msg.go +++ b/dtmcli/msg.go @@ -8,6 +8,7 @@ package dtmcli import ( "database/sql" + "errors" "github.com/dtm-labs/dtm/dtmcli/dtmimp" ) @@ -40,22 +41,35 @@ func (s *Msg) Submit() error { return dtmimp.TransCallDtm(&s.TransBase, s, "submit") } -// PrepareAndSubmit one method for the entire busi->prepare->submit -func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error { +// DoAndSubmitDB short method for Do on db type. please see Do +func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error { + return s.DoAndSubmit(queryPrepared, func(bb *BranchBarrier) error { + return bb.CallWithDB(db, busiCall) + }) +} + +// DoAndSubmit one method for the entire prepare->busi->submit +// if busiCall return ErrFailure, then abort is called directly +// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result +func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error { bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared if err == nil { err = s.Prepare(queryPrepared) } if err == nil { - defer func() { - if err != nil && bb.QueryPrepared(db) == ErrFailure { - _ = dtmimp.TransCallDtm(&s.TransBase, s, "abort") - } - }() - err = bb.CallWithDB(db, busiCall) - } - if err == nil { - err = s.Submit() + errb := busiCall(bb) + if errb != nil && !errors.Is(errb, ErrFailure) { + // if busicall return an error other than failure, we will query the result + _, err = dtmimp.TransRequestBranch(&s.TransBase, "GET", nil, bb.BranchID, bb.Op, queryPrepared) + } + if errors.Is(errb, ErrFailure) || errors.Is(err, ErrFailure) { + _ = dtmimp.TransCallDtm(&s.TransBase, s, "abort") + } else if err == nil { + err = s.Submit() + } + if errb != nil { + return errb + } } return err } diff --git a/dtmcli/tcc.go b/dtmcli/tcc.go index d61a7fe33..67bcbad97 100644 --- a/dtmcli/tcc.go +++ b/dtmcli/tcc.go @@ -75,5 +75,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can if err != nil { return nil, err } - return dtmimp.TransRequestBranch(&t.TransBase, body, branchID, BranchTry, tryURL) + return dtmimp.TransRequestBranch(&t.TransBase, "POST", body, branchID, BranchTry, tryURL) } diff --git a/dtmcli/xa.go b/dtmcli/xa.go index 2b48ecbd9..505837dd4 100644 --- a/dtmcli/xa.go +++ b/dtmcli/xa.go @@ -101,5 +101,5 @@ func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc Xa // CallBranch call a xa branch func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) { branchID := x.NewSubBranchID() - return dtmimp.TransRequestBranch(&x.TransBase, body, branchID, BranchAction, url) + return dtmimp.TransRequestBranch(&x.TransBase, "POST", body, branchID, BranchAction, url) } diff --git a/dtmgrpc/dtmgimp/types.go b/dtmgrpc/dtmgimp/types.go index 535e9240e..d19cd694d 100644 --- a/dtmgrpc/dtmgimp/types.go +++ b/dtmgrpc/dtmgimp/types.go @@ -13,7 +13,10 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtmdriver" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/proto" ) // GrpcServerLog 打印grpc服务端的日志 @@ -46,3 +49,14 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c } return err } + +// InvokeBranch invoke a url for trans +func InvokeBranch(t *dtmimp.TransBase, isRaw bool, msg proto.Message, url string, reply interface{}, branchID string, op string) error { + server, method, err := dtmdriver.GetDriver().ParseServerMethod(url) + if err != nil { + return err + } + ctx := TransInfo2Ctx(t.Gid, t.TransType, branchID, op, t.Dtm) + ctx = metadata.AppendToOutgoingContext(ctx, Map2Kvs(t.BranchHeaders)...) + return MustGetGrpcConn(server, isRaw).Invoke(ctx, method, msg, reply) +} diff --git a/dtmgrpc/msg.go b/dtmgrpc/msg.go index cbb88425f..2a94a5969 100644 --- a/dtmgrpc/msg.go +++ b/dtmgrpc/msg.go @@ -8,6 +8,7 @@ package dtmgrpc import ( "database/sql" + "errors" "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" @@ -43,20 +44,35 @@ func (s *MsgGrpc) Submit() error { return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit") } -// PrepareAndSubmit one method for the entire busi->prepare->submit -func (s *MsgGrpc) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error { +// DoAndSubmitDB short method for Do on db type. please see Do +func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error { + return s.DoAndSubmit(queryPrepared, func(bb *dtmcli.BranchBarrier) error { + return bb.CallWithDB(db, busiCall) + }) +} + +// DoAndSubmit one method for the entire prepare->busi->submit +// if busiCall return ErrFailure, then abort is called directly +// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result +func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error { bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared if err == nil { - err = bb.CallWithDB(db, func(tx *sql.Tx) error { - err := busiCall(tx) - if err == nil { - err = s.Prepare(queryPrepared) - } - return err - }) + err = s.Prepare(queryPrepared) } if err == nil { - err = s.Submit() + errb := busiCall(bb) + if errb != nil && !errors.Is(err, dtmcli.ErrFailure) { + err = dtmgimp.InvokeBranch(&s.TransBase, true, nil, queryPrepared, &[]byte{}, bb.BranchID, bb.Op) + err = GrpcError2DtmError(err) + } + if errors.Is(err, dtmcli.ErrFailure) || errors.Is(errb, dtmcli.ErrFailure) { + _ = dtmgimp.DtmGrpcCall(&s.TransBase, "Abort") + } else if err == nil { + err = s.Submit() + } + if errb != nil { + return errb + } } return err } diff --git a/dtmgrpc/tcc.go b/dtmgrpc/tcc.go index bd1b336fb..54a9e0519 100644 --- a/dtmgrpc/tcc.go +++ b/dtmgrpc/tcc.go @@ -13,8 +13,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" - "github.com/dtm-labs/dtmdriver" - "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" ) @@ -87,11 +85,5 @@ func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL st if err != nil { return err } - server, method, err := dtmdriver.GetDriver().ParseServerMethod(tryURL) - if err != nil { - return err - } - ctx := dtmgimp.TransInfo2Ctx(t.Gid, t.TransType, branchID, "try", t.Dtm) - ctx = metadata.AppendToOutgoingContext(ctx, dtmgimp.Map2Kvs(t.BranchHeaders)...) - return dtmgimp.MustGetGrpcConn(server, false).Invoke(ctx, method, busiMsg, reply) + return dtmgimp.InvokeBranch(&t.TransBase, false, busiMsg, tryURL, reply, branchID, "try") } diff --git a/dtmgrpc/type.go b/dtmgrpc/type.go index 194ff5919..f92d8300b 100644 --- a/dtmgrpc/type.go +++ b/dtmgrpc/type.go @@ -30,6 +30,21 @@ func DtmError2GrpcError(res interface{}) error { return e } +// GrpcError2DtmError translate grpc error to dtm error +func GrpcError2DtmError(err error) error { + st, ok := status.FromError(err) + if ok && st.Code() == codes.Aborted { + // version lower then v1.10, will specify Ongoing in code Aborted + if st.Message() == dtmcli.ResultOngoing { + return dtmcli.ErrOngoing + } + return dtmcli.ErrFailure + } else if ok && st.Code() == codes.FailedPrecondition { + return dtmcli.ErrOngoing + } + return err +} + // MustGenGid must gen a gid from grpcServer func MustGenGid(grpcServer string) string { dc := dtmgimp.MustGetDtmClient(grpcServer) diff --git a/dtmgrpc/xa.go b/dtmgrpc/xa.go index 3cfd8bfb0..a7bbebb57 100644 --- a/dtmgrpc/xa.go +++ b/dtmgrpc/xa.go @@ -15,7 +15,6 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb" - "github.com/dtm-labs/dtmdriver" grpc "google.golang.org/grpc" "google.golang.org/protobuf/proto" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -119,12 +118,5 @@ func (xc *XaGrpcClient) XaGlobalTransaction2(gid string, custom func(*XaGrpc), x // CallBranch call a xa branch func (x *XaGrpc) CallBranch(msg proto.Message, url string, reply interface{}) error { - server, method, err := dtmdriver.GetDriver().ParseServerMethod(url) - if err != nil { - return err - } - err = dtmgimp.MustGetGrpcConn(server, false).Invoke( - dtmgimp.TransInfo2Ctx(x.Gid, x.TransType, x.NewSubBranchID(), "action", x.Dtm), method, msg, reply) - return err - + return dtmgimp.InvokeBranch(&x.TransBase, false, msg, url, reply, x.NewSubBranchID(), "action") } diff --git a/dtmsvr/trans_status.go b/dtmsvr/trans_status.go index c70e5f295..4f1aa313f 100644 --- a/dtmsvr/trans_status.go +++ b/dtmsvr/trans_status.go @@ -15,11 +15,10 @@ import ( "github.com/dtm-labs/dtm/dtmcli" "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmcli/logger" + "github.com/dtm-labs/dtm/dtmgrpc" "github.com/dtm-labs/dtm/dtmgrpc/dtmgimp" "github.com/dtm-labs/dtmdriver" - "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" ) func (t *TransGlobal) touchCronTime(ctype cronType) { @@ -92,17 +91,7 @@ func (t *TransGlobal) getURLResult(url string, branchID, op string, branchPayloa if err == nil { return nil } - st, ok := status.FromError(err) - if ok && st.Code() == codes.Aborted { - // version lower then v1.10, will specify Ongoing in code Aborted - if st.Message() == dtmcli.ResultOngoing { - return dtmcli.ErrOngoing - } - return dtmcli.ErrFailure - } else if ok && st.Code() == codes.FailedPrecondition { - return dtmcli.ErrOngoing - } - return err + return dtmgrpc.GrpcError2DtmError(err) } dtmimp.PanicIf(!strings.HasPrefix(url, "http"), fmt.Errorf("bad url for http: %s", url)) resp, err := dtmimp.RestyClient.R().SetBody(string(branchPayload)). diff --git a/helper/sync-dtmcli.sh b/helper/sync-dtmcli.sh index dd70a5aba..451dc14fa 100755 --- a/helper/sync-dtmcli.sh +++ b/helper/sync-dtmcli.sh @@ -13,7 +13,7 @@ fi cd ../dtmcli cp -rf ../dtm/dtmcli/* ./ -rm -f *_test.go +rm -f *_test.go logger/*.log sed -i '' -e 's/dtm-labs\/dtm\//dtm-labs\//g' *.go */**.go go mod tidy go build || exit 1 diff --git a/test/busi/barrier.go b/test/busi/barrier.go index 9d8c30f55..81e78970c 100644 --- a/test/busi/barrier.go +++ b/test/busi/barrier.go @@ -11,6 +11,8 @@ import ( "database/sql" "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmgrpc" + "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmutil" "github.com/gin-gonic/gin" emptypb "google.golang.org/protobuf/types/known/emptypb" @@ -71,24 +73,24 @@ func init() { }) })) app.POST(BusiAPI+"/SagaRedisTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransInUID), reqFrom(c).Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), reqFrom(c).Amount, 7*86400) })) app.POST(BusiAPI+"/SagaRedisTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransInUID), -reqFrom(c).Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), -reqFrom(c).Amount, 7*86400) })) app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400) })) app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400) })) app.POST(BusiAPI+"/TccBTransOutTry", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { req := reqFrom(c) if req.TransOutResult != "" { return dtmcli.String2DtmError(req.TransOutResult) } - if req.Store == "redis" { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), req.Amount, 7*86400) + if req.Store == config.Redis { + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), req.Amount, 7*86400) } return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { @@ -96,7 +98,7 @@ func init() { }) })) app.POST(BusiAPI+"/TccBTransOutConfirm", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { - if reqFrom(c).Store == "redis" { + if reqFrom(c).Store == config.Redis { return nil } return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { @@ -111,7 +113,7 @@ func init() { func TccBarrierTransOutCancel(c *gin.Context) interface{} { req := reqFrom(c) if req.Store == "redis" { - return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), getRedisAccountKey(TransOutUID), -req.Amount, 7*86400) + return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -req.Amount, 7*86400) } return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error { return tccAdjustTrading(tx, TransOutUID, reqFrom(c).Amount) @@ -146,7 +148,34 @@ func (s *busiServer) TransOutRevertBSaga(ctx context.Context, in *BusiReq) (*emp }) } +func (s *busiServer) TransInRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { + barrier := MustBarrierFromGrpc(ctx) + return &emptypb.Empty{}, barrier.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), int(in.Amount), 86400) +} + +func (s *busiServer) TransOutRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { + barrier := MustBarrierFromGrpc(ctx) + return &emptypb.Empty{}, barrier.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), int(-in.Amount), 86400) +} + +func (s *busiServer) TransInRevertRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { + barrier := MustBarrierFromGrpc(ctx) + return &emptypb.Empty{}, barrier.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransInUID), -int(in.Amount), 86400) +} + +func (s *busiServer) TransOutRevertRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { + barrier := MustBarrierFromGrpc(ctx) + return &emptypb.Empty{}, barrier.RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), int(in.Amount), 86400) +} + func (s *busiServer) QueryPreparedB(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { barrier := MustBarrierFromGrpc(ctx) - return &emptypb.Empty{}, barrier.QueryPrepared(dbGet().ToSQLDB()) + err := barrier.QueryPrepared(dbGet().ToSQLDB()) + return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err) +} + +func (s *busiServer) QueryPreparedRedis(ctx context.Context, in *BusiReq) (*emptypb.Empty, error) { + barrier := MustBarrierFromGrpc(ctx) + err := barrier.RedisQueryPrepared(RedisGet(), 86400) + return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err) } diff --git a/test/busi/base_grpc.go b/test/busi/base_grpc.go index 63099e1b9..682c37dc1 100644 --- a/test/busi/base_grpc.go +++ b/test/busi/base_grpc.go @@ -27,13 +27,13 @@ import ( ) // BusiGrpc busi service grpc address -var BusiGrpc string = fmt.Sprintf("localhost:%d", BusiGrpcPort) +var BusiGrpc = fmt.Sprintf("localhost:%d", BusiGrpcPort) // DtmClient grpc client for dtm -var DtmClient dtmgpb.DtmClient = nil +var DtmClient dtmgpb.DtmClient // XaGrpcClient XA client connection -var XaGrpcClient *dtmgrpc.XaGrpcClient = nil +var XaGrpcClient *dtmgrpc.XaGrpcClient func init() { setupFuncs["XaGrpcSetup"] = func(app *gin.Engine) { diff --git a/test/busi/base_http.go b/test/busi/base_http.go index 0feee1e89..5822aefb6 100644 --- a/test/busi/base_http.go +++ b/test/busi/base_http.go @@ -35,14 +35,17 @@ type setupFunc func(*gin.Engine) var setupFuncs = map[string]setupFunc{} // Busi busi service url prefix -var Busi string = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI) +var Busi = fmt.Sprintf("http://localhost:%d%s", BusiPort, BusiAPI) -var XaClient *dtmcli.XaClient = nil +// XaClient 1 +var XaClient *dtmcli.XaClient +// SleepCancelHandler 1 type SleepCancelHandler func(c *gin.Context) interface{} -var sleepCancelHandler SleepCancelHandler = nil +var sleepCancelHandler SleepCancelHandler +// SetSleepCancelHandler 1 func SetSleepCancelHandler(handler SleepCancelHandler) { sleepCancelHandler = handler } @@ -74,8 +77,9 @@ func BaseAppStartup() *gin.Engine { v(app) } logger.Debugf("Starting busi at: %d", BusiPort) - go app.Run(fmt.Sprintf(":%d", BusiPort)) - + go func() { + _ = app.Run(fmt.Sprintf(":%d", BusiPort)) + }() return app } @@ -128,6 +132,11 @@ func BaseAddRoute(app *gin.Engine) { db := dbGet().ToSQLDB() return bb.QueryPrepared(db) })) + app.GET(BusiAPI+"/RedisQueryPrepared", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { + logger.Debugf("%s RedisQueryPrepared", c.Query("gid")) + bb := MustBarrierFromGin(c) + return bb.RedisQueryPrepared(RedisGet(), 86400) + })) app.POST(BusiAPI+"/TransInXa", dtmutil.WrapHandler2(func(c *gin.Context) interface{} { return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) error { return SagaAdjustBalance(db, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult) @@ -154,7 +163,7 @@ func BaseAddRoute(app *gin.Engine) { if reqFrom(c).TransOutResult == dtmcli.ResultFailure { return dtmcli.ErrFailure } - var dia gorm.Dialector = nil + var dia gorm.Dialector if dtmcli.GetCurrentDBType() == dtmcli.DBTypeMysql { dia = mysql.New(mysql.Config{Conn: db}) } else if dtmcli.GetCurrentDBType() == dtmcli.DBTypePostgres { diff --git a/test/busi/base_types.go b/test/busi/base_types.go index 38b332dcb..c20ba891c 100644 --- a/test/busi/base_types.go +++ b/test/busi/base_types.go @@ -15,6 +15,7 @@ import ( "github.com/gin-gonic/gin" ) +// BusiConf 1 var BusiConf = dtmcli.DBConf{ Driver: "mysql", Host: "localhost", @@ -22,20 +23,23 @@ var BusiConf = dtmcli.DBConf{ User: "root", } +// UserAccount 1 type UserAccount struct { - UserId int + UserID int Balance string TradingBalance string } +// TableName 1 func (*UserAccount) TableName() string { return "dtm_busi.user_account" } -func GetBalanceByUid(uid int, store string) int { +// GetBalanceByUID 1 +func GetBalanceByUID(uid int, store string) int { if store == "redis" { rd := RedisGet() - accA, err := rd.Get(rd.Context(), getRedisAccountKey(uid)).Result() + accA, err := rd.Get(rd.Context(), GetRedisAccountKey(uid)).Result() dtmimp.E2P(err) return dtmimp.MustAtoi(accA) } @@ -127,6 +131,7 @@ type mainSwitchType struct { // MainSwitch controls busi success or fail var MainSwitch mainSwitchType -func getRedisAccountKey(uid int) string { +// GetRedisAccountKey return redis key for uid +func GetRedisAccountKey(uid int) string { return fmt.Sprintf("{a}-redis-account-key-%d", uid) } diff --git a/test/busi/busi.go b/test/busi/busi.go index 3fda2f246..623de066a 100644 --- a/test/busi/busi.go +++ b/test/busi/busi.go @@ -13,7 +13,10 @@ import ( status "google.golang.org/grpc/status" ) +// TransOutUID 1 const TransOutUID = 1 + +// TransInUID 2 const TransInUID = 2 func handleGrpcBusiness(in *BusiReq, result1 string, result2 string, busi string) error { @@ -58,6 +61,7 @@ func sagaGrpcAdjustBalance(db dtmcli.DB, uid int, amount int64, result string) e return err } +// SagaAdjustBalance 1 func SagaAdjustBalance(db dtmcli.DB, uid int, amount int, result string) error { if strings.Contains(result, dtmcli.ResultFailure) { return dtmcli.ErrFailure diff --git a/test/busi/busi.pb.go b/test/busi/busi.pb.go index 759886fcb..09b8d85a4 100644 --- a/test/busi/busi.pb.go +++ b/test/busi/busi.pb.go @@ -148,7 +148,7 @@ var file_test_busi_busi_proto_rawDesc = []byte{ 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x25, 0x0a, 0x09, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x32, 0x8d, 0x09, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, + 0x65, 0x32, 0xbe, 0x0b, 0x0a, 0x04, 0x42, 0x75, 0x73, 0x69, 0x12, 0x32, 0x0a, 0x07, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x33, @@ -214,15 +214,34 @@ var file_test_busi_busi_proto_rawDesc = []byte{ 0x75, 0x74, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, - 0x61, 0x72, 0x65, 0x64, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, - 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, - 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, - 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x42, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, - 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, - 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x79, 0x22, 0x00, 0x12, 0x37, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, 0x6e, 0x52, 0x65, + 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, + 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x38, 0x0a, 0x0d, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, 0x74, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, + 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, + 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x49, + 0x6e, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62, + 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3e, 0x0a, 0x13, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x4f, 0x75, + 0x74, 0x52, 0x65, 0x76, 0x65, 0x72, 0x74, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62, + 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x0d, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, + 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, + 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x62, 0x75, 0x73, 0x69, 0x2e, 0x42, 0x75, 0x73, + 0x69, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72, + 0x79, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x64, 0x42, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, + 0x69, 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x12, 0x51, 0x75, 0x65, 0x72, 0x79, 0x50, 0x72, 0x65, 0x70, + 0x61, 0x72, 0x65, 0x64, 0x52, 0x65, 0x64, 0x69, 0x73, 0x12, 0x0d, 0x2e, 0x62, 0x75, 0x73, 0x69, + 0x2e, 0x42, 0x75, 0x73, 0x69, 0x52, 0x65, 0x71, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x22, 0x00, 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x62, 0x75, 0x73, 0x69, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -262,30 +281,40 @@ var file_test_busi_busi_proto_depIdxs = []int32{ 0, // 15: busi.Busi.TransOutRevertBSaga:input_type -> busi.BusiReq 0, // 16: busi.Busi.TransOutHeaderYes:input_type -> busi.BusiReq 0, // 17: busi.Busi.TransOutHeaderNo:input_type -> busi.BusiReq - 0, // 18: busi.Busi.QueryPrepared:input_type -> busi.BusiReq - 0, // 19: busi.Busi.QueryPreparedB:input_type -> busi.BusiReq - 2, // 20: busi.Busi.TransIn:output_type -> google.protobuf.Empty - 2, // 21: busi.Busi.TransOut:output_type -> google.protobuf.Empty - 2, // 22: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty - 2, // 23: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty - 2, // 24: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty - 2, // 25: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty - 2, // 26: busi.Busi.XaNotify:output_type -> google.protobuf.Empty - 2, // 27: busi.Busi.TransInXa:output_type -> google.protobuf.Empty - 2, // 28: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty - 2, // 29: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty - 2, // 30: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty - 2, // 31: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty - 2, // 32: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty - 2, // 33: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty - 2, // 34: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty - 2, // 35: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty - 2, // 36: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty - 2, // 37: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty - 1, // 38: busi.Busi.QueryPrepared:output_type -> busi.BusiReply - 2, // 39: busi.Busi.QueryPreparedB:output_type -> google.protobuf.Empty - 20, // [20:40] is the sub-list for method output_type - 0, // [0:20] is the sub-list for method input_type + 0, // 18: busi.Busi.TransInRedis:input_type -> busi.BusiReq + 0, // 19: busi.Busi.TransOutRedis:input_type -> busi.BusiReq + 0, // 20: busi.Busi.TransInRevertRedis:input_type -> busi.BusiReq + 0, // 21: busi.Busi.TransOutRevertRedis:input_type -> busi.BusiReq + 0, // 22: busi.Busi.QueryPrepared:input_type -> busi.BusiReq + 0, // 23: busi.Busi.QueryPreparedB:input_type -> busi.BusiReq + 0, // 24: busi.Busi.QueryPreparedRedis:input_type -> busi.BusiReq + 2, // 25: busi.Busi.TransIn:output_type -> google.protobuf.Empty + 2, // 26: busi.Busi.TransOut:output_type -> google.protobuf.Empty + 2, // 27: busi.Busi.TransInRevert:output_type -> google.protobuf.Empty + 2, // 28: busi.Busi.TransOutRevert:output_type -> google.protobuf.Empty + 2, // 29: busi.Busi.TransInConfirm:output_type -> google.protobuf.Empty + 2, // 30: busi.Busi.TransOutConfirm:output_type -> google.protobuf.Empty + 2, // 31: busi.Busi.XaNotify:output_type -> google.protobuf.Empty + 2, // 32: busi.Busi.TransInXa:output_type -> google.protobuf.Empty + 2, // 33: busi.Busi.TransOutXa:output_type -> google.protobuf.Empty + 2, // 34: busi.Busi.TransInTcc:output_type -> google.protobuf.Empty + 2, // 35: busi.Busi.TransOutTcc:output_type -> google.protobuf.Empty + 2, // 36: busi.Busi.TransInTccNested:output_type -> google.protobuf.Empty + 2, // 37: busi.Busi.TransInBSaga:output_type -> google.protobuf.Empty + 2, // 38: busi.Busi.TransOutBSaga:output_type -> google.protobuf.Empty + 2, // 39: busi.Busi.TransInRevertBSaga:output_type -> google.protobuf.Empty + 2, // 40: busi.Busi.TransOutRevertBSaga:output_type -> google.protobuf.Empty + 2, // 41: busi.Busi.TransOutHeaderYes:output_type -> google.protobuf.Empty + 2, // 42: busi.Busi.TransOutHeaderNo:output_type -> google.protobuf.Empty + 2, // 43: busi.Busi.TransInRedis:output_type -> google.protobuf.Empty + 2, // 44: busi.Busi.TransOutRedis:output_type -> google.protobuf.Empty + 2, // 45: busi.Busi.TransInRevertRedis:output_type -> google.protobuf.Empty + 2, // 46: busi.Busi.TransOutRevertRedis:output_type -> google.protobuf.Empty + 1, // 47: busi.Busi.QueryPrepared:output_type -> busi.BusiReply + 2, // 48: busi.Busi.QueryPreparedB:output_type -> google.protobuf.Empty + 2, // 49: busi.Busi.QueryPreparedRedis:output_type -> google.protobuf.Empty + 25, // [25:50] is the sub-list for method output_type + 0, // [0:25] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name diff --git a/test/busi/busi.proto b/test/busi/busi.proto index c94d8df04..72a92b653 100644 --- a/test/busi/busi.proto +++ b/test/busi/busi.proto @@ -37,7 +37,14 @@ service Busi { rpc TransOutRevertBSaga(BusiReq) returns (google.protobuf.Empty) {} rpc TransOutHeaderYes(BusiReq) returns (google.protobuf.Empty) {} rpc TransOutHeaderNo(BusiReq) returns (google.protobuf.Empty) {} + + rpc TransInRedis(BusiReq) returns (google.protobuf.Empty) {} + rpc TransOutRedis(BusiReq) returns (google.protobuf.Empty) {} + rpc TransInRevertRedis(BusiReq) returns (google.protobuf.Empty) {} + rpc TransOutRevertRedis(BusiReq) returns (google.protobuf.Empty) {} + rpc QueryPrepared(BusiReq) returns (BusiReply) {} rpc QueryPreparedB(BusiReq) returns (google.protobuf.Empty) {} + rpc QueryPreparedRedis(BusiReq) returns (google.protobuf.Empty) {} } diff --git a/test/busi/busi_grpc.pb.go b/test/busi/busi_grpc.pb.go index f7db85485..9235453f2 100644 --- a/test/busi/busi_grpc.pb.go +++ b/test/busi/busi_grpc.pb.go @@ -37,8 +37,13 @@ type BusiClient interface { TransOutRevertBSaga(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOutHeaderYes(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransInRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOutRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransInRevertRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + TransOutRevertRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) QueryPreparedB(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) + QueryPreparedRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) } type busiClient struct { @@ -211,6 +216,42 @@ func (c *busiClient) TransOutHeaderNo(ctx context.Context, in *BusiReq, opts ... return out, nil } +func (c *busiClient) TransInRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/TransInRedis", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) TransOutRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/TransOutRedis", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) TransInRevertRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/TransInRevertRedis", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *busiClient) TransOutRevertRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/TransOutRevertRedis", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *busiClient) QueryPrepared(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*BusiReply, error) { out := new(BusiReply) err := c.cc.Invoke(ctx, "/busi.Busi/QueryPrepared", in, out, opts...) @@ -229,6 +270,15 @@ func (c *busiClient) QueryPreparedB(ctx context.Context, in *BusiReq, opts ...gr return out, nil } +func (c *busiClient) QueryPreparedRedis(ctx context.Context, in *BusiReq, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/busi.Busi/QueryPreparedRedis", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // BusiServer is the server API for Busi service. // All implementations must embed UnimplementedBusiServer // for forward compatibility @@ -251,8 +301,13 @@ type BusiServer interface { TransOutRevertBSaga(context.Context, *BusiReq) (*emptypb.Empty, error) TransOutHeaderYes(context.Context, *BusiReq) (*emptypb.Empty, error) TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) + TransInRedis(context.Context, *BusiReq) (*emptypb.Empty, error) + TransOutRedis(context.Context, *BusiReq) (*emptypb.Empty, error) + TransInRevertRedis(context.Context, *BusiReq) (*emptypb.Empty, error) + TransOutRevertRedis(context.Context, *BusiReq) (*emptypb.Empty, error) QueryPrepared(context.Context, *BusiReq) (*BusiReply, error) QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error) + QueryPreparedRedis(context.Context, *BusiReq) (*emptypb.Empty, error) mustEmbedUnimplementedBusiServer() } @@ -314,12 +369,27 @@ func (UnimplementedBusiServer) TransOutHeaderYes(context.Context, *BusiReq) (*em func (UnimplementedBusiServer) TransOutHeaderNo(context.Context, *BusiReq) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method TransOutHeaderNo not implemented") } +func (UnimplementedBusiServer) TransInRedis(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransInRedis not implemented") +} +func (UnimplementedBusiServer) TransOutRedis(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransOutRedis not implemented") +} +func (UnimplementedBusiServer) TransInRevertRedis(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransInRevertRedis not implemented") +} +func (UnimplementedBusiServer) TransOutRevertRedis(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TransOutRevertRedis not implemented") +} func (UnimplementedBusiServer) QueryPrepared(context.Context, *BusiReq) (*BusiReply, error) { return nil, status.Errorf(codes.Unimplemented, "method QueryPrepared not implemented") } func (UnimplementedBusiServer) QueryPreparedB(context.Context, *BusiReq) (*emptypb.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method QueryPreparedB not implemented") } +func (UnimplementedBusiServer) QueryPreparedRedis(context.Context, *BusiReq) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method QueryPreparedRedis not implemented") +} func (UnimplementedBusiServer) mustEmbedUnimplementedBusiServer() {} // UnsafeBusiServer may be embedded to opt out of forward compatibility for this service. @@ -657,6 +727,78 @@ func _Busi_TransOutHeaderNo_Handler(srv interface{}, ctx context.Context, dec fu return interceptor(ctx, in, info, handler) } +func _Busi_TransInRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransInRedis(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/TransInRedis", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransInRedis(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_TransOutRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransOutRedis(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/TransOutRedis", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransOutRedis(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_TransInRevertRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransInRevertRedis(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/TransInRevertRedis", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransInRevertRedis(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + +func _Busi_TransOutRevertRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).TransOutRevertRedis(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/TransOutRevertRedis", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).TransOutRevertRedis(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + func _Busi_QueryPrepared_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BusiReq) if err := dec(in); err != nil { @@ -693,6 +835,24 @@ func _Busi_QueryPreparedB_Handler(srv interface{}, ctx context.Context, dec func return interceptor(ctx, in, info, handler) } +func _Busi_QueryPreparedRedis_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(BusiReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BusiServer).QueryPreparedRedis(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/busi.Busi/QueryPreparedRedis", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BusiServer).QueryPreparedRedis(ctx, req.(*BusiReq)) + } + return interceptor(ctx, in, info, handler) +} + // Busi_ServiceDesc is the grpc.ServiceDesc for Busi service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -772,6 +932,22 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ MethodName: "TransOutHeaderNo", Handler: _Busi_TransOutHeaderNo_Handler, }, + { + MethodName: "TransInRedis", + Handler: _Busi_TransInRedis_Handler, + }, + { + MethodName: "TransOutRedis", + Handler: _Busi_TransOutRedis_Handler, + }, + { + MethodName: "TransInRevertRedis", + Handler: _Busi_TransInRevertRedis_Handler, + }, + { + MethodName: "TransOutRevertRedis", + Handler: _Busi_TransOutRevertRedis_Handler, + }, { MethodName: "QueryPrepared", Handler: _Busi_QueryPrepared_Handler, @@ -780,6 +956,10 @@ var Busi_ServiceDesc = grpc.ServiceDesc{ MethodName: "QueryPreparedB", Handler: _Busi_QueryPreparedB_Handler, }, + { + MethodName: "QueryPreparedRedis", + Handler: _Busi_QueryPreparedRedis_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "test/busi/busi.proto", diff --git a/test/busi/quick_start.go b/test/busi/quick_start.go index da939ec34..a581dc6d1 100644 --- a/test/busi/quick_start.go +++ b/test/busi/quick_start.go @@ -24,14 +24,18 @@ const qsBusiPort = 8082 var qsBusi = fmt.Sprintf("http://localhost:%d%s", qsBusiPort, qsBusiAPI) +// QsStartSvr quick start: start server func QsStartSvr() { app := dtmutil.GetGinApp() qsAddRoute(app) logger.Infof("quick start examples listening at %d", qsBusiPort) - go app.Run(fmt.Sprintf(":%d", qsBusiPort)) + go func() { + _ = app.Run(fmt.Sprintf(":%d", qsBusiPort)) + }() time.Sleep(100 * time.Millisecond) } +// QsFireRequest quick start: fire request func QsFireRequest() string { req := &gin.H{"amount": 30} // 微服务的载荷 // DtmServer为DTM服务的地址 diff --git a/test/busi/startup.go b/test/busi/startup.go index 2b40a5ccd..7e0294950 100644 --- a/test/busi/startup.go +++ b/test/busi/startup.go @@ -24,4 +24,5 @@ func PopulateDB(skipDrop bool) { dtmutil.RunSQLScript(BusiConf, file, skipDrop) _, err := RedisGet().FlushAll(context.Background()).Result() // redis barrier need clear dtmimp.E2P(err) + SetRedisBothAccount(10000, 10000) } diff --git a/test/busi/utils.go b/test/busi/utils.go index 9441d07fb..86615e4d9 100644 --- a/test/busi/utils.go +++ b/test/busi/utils.go @@ -5,6 +5,7 @@ import ( "database/sql" "encoding/json" "fmt" + "os" "strings" sync "sync" "time" @@ -79,8 +80,8 @@ func SetGrpcHeaderForHeadersYes(ctx context.Context, method string, req, reply i return invoker(ctx, method, req, reply, cc, opts...) } -// SetHttpHeaderForHeadersYes interceptor to set head for HeadersYes -func SetHttpHeaderForHeadersYes(c *resty.Client, r *resty.Request) error { +// SetHTTPHeaderForHeadersYes interceptor to set head for HeadersYes +func SetHTTPHeaderForHeadersYes(c *resty.Client, r *resty.Request) error { if b, ok := r.Body.(*dtmcli.Saga); ok && strings.HasSuffix(b.Gid, "HeadersYes") { logger.Debugf("set test_header for url: %s", r.URL) r.SetHeader("test_header", "yes") @@ -121,11 +122,12 @@ var ( once sync.Once ) +// RedisGet 1 func RedisGet() *redis.Client { once.Do(func() { logger.Debugf("connecting to client redis") rdb = redis.NewClient(&redis.Options{ - Addr: "localhost:6379", + Addr: dtmimp.OrString(os.Getenv("BUSI_REDIS"), "localhost:6379"), Username: "root", Password: "", }) @@ -133,10 +135,11 @@ func RedisGet() *redis.Client { return rdb } +// SetRedisBothAccount 1 func SetRedisBothAccount(accountA int, accountB int) { rd := RedisGet() - _, err := rd.Set(rd.Context(), getRedisAccountKey(TransOutUID), accountA, 0).Result() + _, err := rd.Set(rd.Context(), GetRedisAccountKey(TransOutUID), accountA, 0).Result() dtmimp.E2P(err) - _, err = rd.Set(rd.Context(), getRedisAccountKey(TransInUID), accountB, 0).Result() + _, err = rd.Set(rd.Context(), GetRedisAccountKey(TransInUID), accountB, 0).Result() dtmimp.E2P(err) } diff --git a/test/dtmsvr_test.go b/test/dtmsvr_test.go index 6078e5b8e..345bebeb0 100644 --- a/test/dtmsvr_test.go +++ b/test/dtmsvr_test.go @@ -12,6 +12,7 @@ import ( "github.com/dtm-labs/dtm/dtmcli/dtmimp" "github.com/dtm-labs/dtm/dtmsvr" + "github.com/dtm-labs/dtm/dtmsvr/config" "github.com/dtm-labs/dtm/dtmutil" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" @@ -34,13 +35,8 @@ func getBranchesStatus(gid string) []string { return status } -func assertSucceed(t *testing.T, gid string) { - waitTransProcessed(gid) - assert.Equal(t, StatusSucceed, getTransStatus(gid)) -} - func TestUpdateBranchAsync(t *testing.T) { - if conf.Store.Driver != "mysql" { + if conf.Store.Driver != config.Mysql { return } conf.UpdateBranchSync = 0 diff --git a/test/main_test.go b/test/main_test.go index 69d29cf7a..4f17d73e8 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -37,7 +37,7 @@ func TestMain(m *testing.M) { conf.UpdateBranchSync = 1 dtmgrpc.AddUnaryInterceptor(busi.SetGrpcHeaderForHeadersYes) - dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHttpHeaderForHeadersYes) + dtmcli.GetRestyClient().OnBeforeRequest(busi.SetHTTPHeaderForHeadersYes) dtmcli.GetRestyClient().OnAfterResponse(func(c *resty.Client, resp *resty.Response) error { return nil }) tenv := os.Getenv("TEST_STORE") @@ -68,4 +68,5 @@ func TestMain(m *testing.M) { close(dtmsvr.TransProcessedTestChan) gid, more := <-dtmsvr.TransProcessedTestChan logger.FatalfIf(more, "extra gid: %s in test chan", gid) + os.Exit(0) } diff --git a/test/msg_barrier_redis_test.go b/test/msg_barrier_redis_test.go new file mode 100644 index 000000000..5a5f4db53 --- /dev/null +++ b/test/msg_barrier_redis_test.go @@ -0,0 +1,82 @@ +package test + +import ( + "errors" + "testing" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestMsgRedisDo(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + }) + assert.Nil(t, err) + waitTransProcessed(msg.Gid) + assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) + assertNotSameBalance(t, before, "redis") +} + +func TestMsgRedisDoBusiFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return errors.New("an error") + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgRedisDoPrepareFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer+"not-exists", gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgRedisDoCommitFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + return errors.New("after commit error") + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgRedisDoCommitAfterFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenTransReq(30, false, false) + msg := dtmcli.NewMsg(DtmServer, gid). + Add(busi.Busi+"/SagaRedisTransIn", req) + err := msg.DoAndSubmit(Busi+"/RedisQueryPrepared", func(bb *dtmcli.BranchBarrier) error { + err := bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + dtmimp.E2P(err) + return errors.New("an error") + }) + assert.Error(t, err) + waitTransProcessed(gid) + assertNotSameBalance(t, before, "redis") +} diff --git a/test/msg_barrier_test.go b/test/msg_barrier_test.go index cd274fbe7..150b7a008 100644 --- a/test/msg_barrier_test.go +++ b/test/msg_barrier_test.go @@ -20,7 +20,7 @@ func TestMsgPrepareAndSubmit(t *testing.T) { req := busi.GenTransReq(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaBTransIn", req) - err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS") }) assert.Nil(t, err) @@ -36,7 +36,7 @@ func TestMsgPrepareAndSubmitBusiFailed(t *testing.T) { req := busi.GenTransReq(30, false, false) msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaBTransIn", req) - err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { return errors.New("an error") }) assert.Error(t, err) @@ -49,7 +49,7 @@ func TestMsgPrepareAndSubmitPrepareFailed(t *testing.T) { req := busi.GenTransReq(30, false, false) msg := dtmcli.NewMsg(DtmServer+"not-exists", gid). Add(busi.Busi+"/SagaBTransIn", req) - err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { return busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS") }) assert.Error(t, err) @@ -66,7 +66,7 @@ func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) { msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaBTransIn", req) var g *monkey.PatchGuard - err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { g = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error { logger.Debugf("tx.Commit rollback and return error in test") _ = tx.Rollback() @@ -76,7 +76,6 @@ func TestMsgPrepareAndSubmitCommitFailed(t *testing.T) { }) g.Unpatch() assert.Error(t, err) - cronTransOnceForwardNow(180) assertSameBalance(t, before, "mysql") } @@ -90,7 +89,7 @@ func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) { msg := dtmcli.NewMsg(DtmServer, gid). Add(busi.Busi+"/SagaBTransIn", req) var guard *monkey.PatchGuard - err := msg.PrepareAndSubmit(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + err := msg.DoAndSubmitDB(Busi+"/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { err := busi.SagaAdjustBalance(tx, busi.TransOutUID, -req.Amount, "SUCCESS") guard = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error { guard.Unpatch() @@ -100,6 +99,6 @@ func TestMsgPrepareAndSubmitCommitAfterFailed(t *testing.T) { return err }) assert.Error(t, err) - cronTransOnceForwardNow(180) + waitTransProcessed(gid) assertNotSameBalance(t, before, "mysql") } diff --git a/test/msg_grpc_barrier_redis_test.go b/test/msg_grpc_barrier_redis_test.go new file mode 100644 index 000000000..c3b2d1fa8 --- /dev/null +++ b/test/msg_grpc_barrier_redis_test.go @@ -0,0 +1,83 @@ +package test + +import ( + "errors" + "testing" + + "github.com/dtm-labs/dtm/dtmcli" + "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmgrpc" + "github.com/dtm-labs/dtm/test/busi" + "github.com/stretchr/testify/assert" +) + +func TestMsgGrpcRedisDo(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenBusiReq(30, false, false) + msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). + Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) + err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { + return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + }) + assert.Nil(t, err) + waitTransProcessed(msg.Gid) + assert.Equal(t, []string{StatusSucceed}, getBranchesStatus(msg.Gid)) + assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) + assertNotSameBalance(t, before, "redis") +} + +func TestMsgGrpcRedisDoBusiFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenBusiReq(30, false, false) + msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). + Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) + err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { + return errors.New("an error") + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgGrpcRedisDoPrepareFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenBusiReq(30, false, false) + msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer+"not-exists", gid). + Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) + err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { + return bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgGrpcRedisDoCommitFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenBusiReq(30, false, false) + msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). + Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) + err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { + return errors.New("after commit error") + }) + assert.Error(t, err) + assertSameBalance(t, before, "redis") +} + +func TestMsgGrpcRedisDoCommitAfterFailed(t *testing.T) { + before := getBeforeBalances("redis") + gid := dtmimp.GetFuncName() + req := busi.GenBusiReq(30, false, false) + msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). + Add(busi.BusiGrpc+"/busi.Busi/TransInRedis", req) + err := msg.DoAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedRedis", func(bb *dtmcli.BranchBarrier) error { + err := bb.RedisCheckAdjustAmount(busi.RedisGet(), busi.GetRedisAccountKey(busi.TransOutUID), -30, 86400) + dtmimp.E2P(err) + return errors.New("an error") + }) + assert.Error(t, err) + waitTransProcessed(gid) + assertNotSameBalance(t, before, "redis") +} diff --git a/test/msg_grpc_barrier_test.go b/test/msg_grpc_barrier_test.go index 50edd6ce5..b850f7223 100644 --- a/test/msg_grpc_barrier_test.go +++ b/test/msg_grpc_barrier_test.go @@ -8,6 +8,7 @@ import ( "bou.ke/monkey" "github.com/dtm-labs/dtm/dtmcli/dtmimp" + "github.com/dtm-labs/dtm/dtmcli/logger" "github.com/dtm-labs/dtm/dtmgrpc" "github.com/dtm-labs/dtm/test/busi" "github.com/stretchr/testify/assert" @@ -19,7 +20,7 @@ func TestMsgGrpcPrepareAndSubmit(t *testing.T) { req := busi.GenBusiReq(30, false, false) msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req) - err := msg.PrepareAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + err := msg.DoAndSubmitDB(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { return busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS") }) assert.Nil(t, err) @@ -39,7 +40,7 @@ func TestMsgGrpcPrepareAndSubmitCommitAfterFailed(t *testing.T) { msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). Add(busi.BusiGrpc+"/busi.Busi/TransInBSaga", req) var guard *monkey.PatchGuard - err := msg.PrepareAndSubmit(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + err := msg.DoAndSubmitDB(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { err := busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS") guard = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error { guard.Unpatch() @@ -49,6 +50,29 @@ func TestMsgGrpcPrepareAndSubmitCommitAfterFailed(t *testing.T) { return err }) assert.Error(t, err) - cronTransOnceForwardNow(180) + waitTransProcessed(gid) assertNotSameBalance(t, before, "mysql") } + +func TestMsgGrpcPrepareAndSubmitCommitFailed(t *testing.T) { + if conf.Store.IsDB() { // cannot patch tx.Commit, because Prepare also do Commit + return + } + before := getBeforeBalances("mysql") + gid := dtmimp.GetFuncName() + req := busi.GenBusiReq(30, false, false) + msg := dtmgrpc.NewMsgGrpc(DtmGrpcServer, gid). + Add(busi.Busi+"/SagaBTransIn", req) + var g *monkey.PatchGuard + err := msg.DoAndSubmitDB(busi.BusiGrpc+"/busi.Busi/QueryPreparedB", dbGet().ToSQLDB(), func(tx *sql.Tx) error { + g = monkey.PatchInstanceMethod(reflect.TypeOf(tx), "Commit", func(tx *sql.Tx) error { + logger.Debugf("tx.Commit rollback and return error in test") + _ = tx.Rollback() + return errors.New("test error for patch") + }) + return busi.SagaAdjustBalance(tx, busi.TransOutUID, -int(req.Amount), "SUCCESS") + }) + g.Unpatch() + assert.Error(t, err) + assertSameBalance(t, before, "mysql") +} diff --git a/test/msg_grpc_test.go b/test/msg_grpc_test.go index 47019031b..05f399710 100644 --- a/test/msg_grpc_test.go +++ b/test/msg_grpc_test.go @@ -28,31 +28,32 @@ func TestMsgGrpcNormal(t *testing.T) { } func TestMsgGrpcTimeoutSuccess(t *testing.T) { - msg := genGrpcMsg(dtmimp.GetFuncName()) + gid := dtmimp.GetFuncName() + msg := genGrpcMsg(gid) err := msg.Prepare("") assert.Nil(t, err) busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid)) - g := cronTransOnce() - assert.Equal(t, msg.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) } func TestMsgGrpcTimeoutFailed(t *testing.T) { - msg := genGrpcMsg(dtmimp.GetFuncName()) + gid := dtmimp.GetFuncName() + msg := genGrpcMsg(gid) msg.Prepare("") assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, StatusFailed, getTransStatus(msg.Gid)) assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid)) } diff --git a/test/msg_options_test.go b/test/msg_options_test.go index 1f60810bd..3d1ded860 100644 --- a/test/msg_options_test.go +++ b/test/msg_options_test.go @@ -16,38 +16,38 @@ import ( ) func TestMsgOptionsTimeout(t *testing.T) { - msg := genMsg(dtmimp.GetFuncName()) + gid := dtmimp.GetFuncName() + msg := genMsg(gid) msg.Prepare("") - g := cronTransOnce() - assert.Equal(t, msg.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - cronTransOnceForwardNow(60) + cronTransOnceForwardNow(t, gid, 60) assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) } func TestMsgOptionsTimeoutCustom(t *testing.T) { - msg := genMsg(dtmimp.GetFuncName()) + gid := dtmimp.GetFuncName() + msg := genMsg(gid) msg.TimeoutToFail = 120 msg.Prepare("") - g := cronTransOnce() - assert.Equal(t, msg.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - cronTransOnceForwardNow(60) + cronTransOnceForwardNow(t, gid, 60) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) } func TestMsgOptionsTimeoutFailed(t *testing.T) { - msg := genMsg(dtmimp.GetFuncName()) + gid := dtmimp.GetFuncName() + msg := genMsg(gid) msg.TimeoutToFail = 120 msg.Prepare("") - g := cronTransOnce() - assert.Equal(t, msg.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) - cronTransOnceForwardNow(60) + cronTransOnceForwardNow(t, gid, 60) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, StatusFailed, getTransStatus(msg.Gid)) } diff --git a/test/msg_test.go b/test/msg_test.go index 8f26ad879..5f629d5f3 100644 --- a/test/msg_test.go +++ b/test/msg_test.go @@ -26,30 +26,31 @@ func TestMsgNormal(t *testing.T) { } func TestMsgTimeoutSuccess(t *testing.T) { - msg := genMsg(dtmimp.GetFuncName()) + gid := dtmimp.GetFuncName() + msg := genMsg(gid) msg.Prepare("") assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.TransInResult.SetOnce(dtmcli.ResultOngoing) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, StatusSubmitted, getTransStatus(msg.Gid)) - g := cronTransOnce() - assert.Equal(t, msg.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, []string{StatusSucceed, StatusSucceed}, getBranchesStatus(msg.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(msg.Gid)) } func TestMsgTimeoutFailed(t *testing.T) { - msg := genMsg(dtmimp.GetFuncName()) + gid := dtmimp.GetFuncName() + msg := genMsg(gid) msg.Prepare("") assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultOngoing) - cronTransOnceForwardNow(360) + cronTransOnceForwardNow(t, gid, 360) assert.Equal(t, StatusPrepared, getTransStatus(msg.Gid)) busi.MainSwitch.QueryPreparedResult.SetOnce(dtmcli.ResultFailure) - cronTransOnceForwardNow(180) + cronTransOnceForwardNow(t, gid, 180) assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(msg.Gid)) assert.Equal(t, StatusFailed, getTransStatus(msg.Gid)) } diff --git a/test/saga_concurrent_test.go b/test/saga_concurrent_test.go index 5c33770f6..f130a8bf8 100644 --- a/test/saga_concurrent_test.go +++ b/test/saga_concurrent_test.go @@ -28,14 +28,14 @@ func TestSagaConNormal(t *testing.T) { } func TestSagaConRollbackNormal(t *testing.T) { - sagaCon := genSagaCon(dtmimp.GetFuncName(), true, false) + gid := dtmimp.GetFuncName() + sagaCon := genSagaCon(gid, true, false) busi.MainSwitch.TransOutRevertResult.SetOnce(dtmcli.ResultOngoing) err := sagaCon.Submit() assert.Nil(t, err) waitTransProcessed(sagaCon.Gid) assert.Equal(t, StatusAborting, getTransStatus(sagaCon.Gid)) - g := cronTransOnce() - assert.Equal(t, sagaCon.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusFailed, getTransStatus(sagaCon.Gid)) // TODO should fix this // assert.Equal(t, []string{StatusSucceed, StatusFailed, StatusSucceed, StatusSucceed}, getBranchesStatus(sagaCon.Gid)) @@ -61,15 +61,15 @@ func TestSagaConRollbackOrder2(t *testing.T) { assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(sagaCon.Gid)) } func TestSagaConCommittedOngoing(t *testing.T) { - sagaCon := genSagaCon(dtmimp.GetFuncName(), false, false) + gid := dtmimp.GetFuncName() + sagaCon := genSagaCon(gid, false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) sagaCon.Submit() waitTransProcessed(sagaCon.Gid) assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusSucceed}, getBranchesStatus(sagaCon.Gid)) assert.Equal(t, StatusSubmitted, getTransStatus(sagaCon.Gid)) - g := cronTransOnce() - assert.Equal(t, sagaCon.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(sagaCon.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(sagaCon.Gid)) } diff --git a/test/saga_grpc_test.go b/test/saga_grpc_test.go index 74fd82790..ee181ceaf 100644 --- a/test/saga_grpc_test.go +++ b/test/saga_grpc_test.go @@ -26,13 +26,13 @@ func TestSagaGrpcNormal(t *testing.T) { } func TestSagaGrpcRollback(t *testing.T) { - saga := genSagaGrpc(dtmimp.GetFuncName(), false, true) + gid := dtmimp.GetFuncName() + saga := genSagaGrpc(gid, false, true) busi.MainSwitch.TransOutRevertResult.SetOnce(dtmcli.ResultOngoing) saga.Submit() waitTransProcessed(saga.Gid) assert.Equal(t, StatusAborting, getTransStatus(saga.Gid)) - g := cronTransOnce() - assert.Equal(t, saga.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) } @@ -57,20 +57,21 @@ func TestSagaGrpcCurrentOrder(t *testing.T) { } func TestSagaGrpcCommittedOngoing(t *testing.T) { - saga := genSagaGrpc(dtmimp.GetFuncName(), false, false) + gid := dtmimp.GetFuncName() + saga := genSagaGrpc(gid, false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) saga.Submit() waitTransProcessed(saga.Gid) assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) - g := cronTransOnce() - assert.Equal(t, saga.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) } func TestSagaGrpcNormalWait(t *testing.T) { - saga := genSagaGrpc(dtmimp.GetFuncName(), false, false) + gid := dtmimp.GetFuncName() + saga := genSagaGrpc(gid, false, false) saga.WaitResult = true saga.Submit() assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) @@ -89,6 +90,7 @@ func TestSagaGrpcEmptyUrl(t *testing.T) { assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } +//nolint: unparam func genSagaGrpc(gid string, outFailed bool, inFailed bool) *dtmgrpc.SagaGrpc { saga := dtmgrpc.NewSagaGrpc(dtmutil.DefaultGrpcServer, gid) req := busi.GenBusiReq(30, outFailed, inFailed) @@ -118,8 +120,7 @@ func TestSagaGrpcCronPassthroughHeadersYes(t *testing.T) { assert.Nil(t, err) waitTransProcessed(gidYes) assert.Equal(t, StatusSubmitted, getTransStatus(gidYes)) - g := cronTransOnce() - assert.Equal(t, gidYes, g) + cronTransOnce(t, gidYes) assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) } @@ -158,7 +159,6 @@ func TestSagaGrpcCronHeaders(t *testing.T) { assert.Nil(t, err) waitTransProcessed(gidYes) assert.Equal(t, StatusSubmitted, getTransStatus(gidYes)) - g := cronTransOnce() - assert.Equal(t, gidYes, g) + cronTransOnce(t, gidYes) assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) } diff --git a/test/saga_options_test.go b/test/saga_options_test.go index 783823cc9..090074f3d 100644 --- a/test/saga_options_test.go +++ b/test/saga_options_test.go @@ -17,19 +17,20 @@ import ( ) func TestSagaOptionsRetryOngoing(t *testing.T) { + gid := dtmimp.GetFuncName() saga := genSaga1(dtmimp.GetFuncName(), false, false) saga.RetryInterval = 150 // CronForwardDuration is larger than RetryInterval busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) err := saga.Submit() assert.Nil(t, err) waitTransProcessed(saga.Gid) - g := cronTransOnce() - assert.Equal(t, saga.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) } func TestSagaOptionsRetryError(t *testing.T) { + gid := dtmimp.GetFuncName() saga := genSaga1(dtmimp.GetFuncName(), false, false) saga.RetryInterval = 150 // CronForwardDuration is less than 2*RetryInterval busi.MainSwitch.TransOutResult.SetOnce("ERROR") @@ -38,22 +39,21 @@ func TestSagaOptionsRetryError(t *testing.T) { waitTransProcessed(saga.Gid) assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) - g := cronTransOnce() - assert.Equal(t, "", g) - g = cronTransOnceForwardCron(360) - assert.Equal(t, saga.Gid, g) + cronTransOnce(t, "") + cronTransOnceForwardCron(t, gid, 360) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) } func TestSagaOptionsTimeout(t *testing.T) { + gid := dtmimp.GetFuncName() saga := genSaga(dtmimp.GetFuncName(), false, false) saga.TimeoutToFail = 1800 busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) saga.Submit() waitTransProcessed(saga.Gid) assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) - cronTransOnceForwardNow(3600) + cronTransOnceForwardNow(t, gid, 3600) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) } @@ -68,6 +68,7 @@ func TestSagaOptionsNormalWait(t *testing.T) { } func TestSagaOptionsCommittedOngoingWait(t *testing.T) { + gid := dtmimp.GetFuncName() saga := genSaga(dtmimp.GetFuncName(), false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) saga.WaitResult = true @@ -76,8 +77,7 @@ func TestSagaOptionsCommittedOngoingWait(t *testing.T) { assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) waitTransProcessed(saga.Gid) - g := cronTransOnce() - assert.Equal(t, saga.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } @@ -113,8 +113,7 @@ func TestSagaCronPassthroughHeadersYes(t *testing.T) { assert.Nil(t, err) waitTransProcessed(gidYes) assert.Equal(t, StatusSubmitted, getTransStatus(gidYes)) - g := cronTransOnce() - assert.Equal(t, gidYes, g) + cronTransOnce(t, gidYes) assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) } @@ -153,7 +152,6 @@ func TestSagaHeadersYes1(t *testing.T) { assert.Nil(t, err) waitTransProcessed(gidYes) assert.Equal(t, StatusSubmitted, getTransStatus(gidYes)) - g := cronTransOnce() - assert.Equal(t, gidYes, g) + cronTransOnce(t, gidYes) assert.Equal(t, StatusSucceed, getTransStatus(gidYes)) } diff --git a/test/saga_test.go b/test/saga_test.go index b7afd3a6e..d7b9ad7dd 100644 --- a/test/saga_test.go +++ b/test/saga_test.go @@ -34,27 +34,27 @@ func TestSagaRollback(t *testing.T) { } func TestSagaOngoingSucceed(t *testing.T) { + gid := dtmimp.GetFuncName() saga := genSaga(dtmimp.GetFuncName(), false, false) busi.MainSwitch.TransOutResult.SetOnce(dtmcli.ResultOngoing) saga.Submit() waitTransProcessed(saga.Gid) assert.Equal(t, []string{StatusPrepared, StatusPrepared, StatusPrepared, StatusPrepared}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSubmitted, getTransStatus(saga.Gid)) - g := cronTransOnce() - assert.Equal(t, saga.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, []string{StatusPrepared, StatusSucceed, StatusPrepared, StatusSucceed}, getBranchesStatus(saga.Gid)) assert.Equal(t, StatusSucceed, getTransStatus(saga.Gid)) } func TestSagaFailed(t *testing.T) { + gid := dtmimp.GetFuncName() saga := genSaga(dtmimp.GetFuncName(), false, true) busi.MainSwitch.TransOutRevertResult.SetOnce("ERROR") err := saga.Submit() assert.Nil(t, err) waitTransProcessed(saga.Gid) assert.Equal(t, StatusAborting, getTransStatus(saga.Gid)) - g := cronTransOnce() - assert.Equal(t, saga.Gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusFailed, getTransStatus(saga.Gid)) assert.Equal(t, []string{StatusSucceed, StatusSucceed, StatusSucceed, StatusFailed}, getBranchesStatus(saga.Gid)) } diff --git a/test/tcc_barrier_test.go b/test/tcc_barrier_test.go index 8a1ae57f0..57692b390 100644 --- a/test/tcc_barrier_test.go +++ b/test/tcc_barrier_test.go @@ -97,7 +97,7 @@ func runTestTccBarrierDisorder(t *testing.T, store string) { logger.Debugf("cron to timeout and then call cancelled twice") cron := func() { - cronTransOnceForwardNow(300) + cronTransOnceForwardNow(t, gid, 300) logger.Debugf("cronFinished write") cronFinished <- "1" logger.Debugf("cronFinished after write") diff --git a/test/tcc_grpc_cover_test.go b/test/tcc_grpc_cover_test.go index 192d491be..a667e7234 100644 --- a/test/tcc_grpc_cover_test.go +++ b/test/tcc_grpc_cover_test.go @@ -47,6 +47,5 @@ func TestTccGrpcCoverCallBranch(t *testing.T) { return err }) assert.Error(t, err) - g := cronTransOnceForwardNow(300) - assert.Equal(t, gid, g) + cronTransOnceForwardNow(t, gid, 300) } diff --git a/test/tcc_grpc_test.go b/test/tcc_grpc_test.go index b3a816934..5a7c336e1 100644 --- a/test/tcc_grpc_test.go +++ b/test/tcc_grpc_test.go @@ -49,8 +49,7 @@ func TestTccGrpcRollback(t *testing.T) { assert.Error(t, err) waitTransProcessed(gid) assert.Equal(t, StatusAborting, getTransStatus(gid)) - g2 := cronTransOnce() - assert.Equal(t, gid, g2) + cronTransOnce(t, gid) assert.Equal(t, StatusFailed, getTransStatus(gid)) assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid)) } diff --git a/test/tcc_old_test.go b/test/tcc_old_test.go index 9a55611bc..5aca2e14b 100644 --- a/test/tcc_old_test.go +++ b/test/tcc_old_test.go @@ -37,8 +37,7 @@ func TestTccOldRollback(t *testing.T) { assert.Error(t, err) waitTransProcessed(gid) assert.Equal(t, StatusAborting, getTransStatus(gid)) - g := cronTransOnce() - assert.Equal(t, gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusFailed, getTransStatus(gid)) assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid)) } @@ -52,7 +51,7 @@ func TestTccOldTimeout(t *testing.T) { _, err := tcc.CallBranch(req, Busi+"/TransOutOld", Busi+"/TransOutConfirmOld", Busi+"/TransOutRevertOld") assert.Nil(t, err) go func() { - cronTransOnceForwardNow(300) + cronTransOnceForwardNow(t, gid, 300) timeoutChan <- 0 }() <-timeoutChan diff --git a/test/tcc_test.go b/test/tcc_test.go index 8264d4314..60fccd2a6 100644 --- a/test/tcc_test.go +++ b/test/tcc_test.go @@ -43,8 +43,7 @@ func TestTccRollback(t *testing.T) { assert.Error(t, err) waitTransProcessed(gid) assert.Equal(t, StatusAborting, getTransStatus(gid)) - g := cronTransOnce() - assert.Equal(t, gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusFailed, getTransStatus(gid)) assert.Equal(t, []string{StatusSucceed, StatusPrepared, StatusSucceed, StatusPrepared}, getBranchesStatus(gid)) } @@ -58,7 +57,7 @@ func TestTccTimeout(t *testing.T) { _, err := tcc.CallBranch(req, Busi+"/TransOut", Busi+"/TransOutConfirm", Busi+"/TransOutRevert") assert.Nil(t, err) go func() { - cronTransOnceForwardNow(300) + cronTransOnceForwardNow(t, gid, 300) timeoutChan <- 0 }() <-timeoutChan diff --git a/test/types.go b/test/types.go index adff1a662..2d1d5452d 100644 --- a/test/types.go +++ b/test/types.go @@ -38,12 +38,12 @@ func waitTransProcessed(gid string) { } } -func cronTransOnce() string { - gid := dtmsvr.CronTransOnce() +func cronTransOnce(t *testing.T, gid string) { + gid2 := dtmsvr.CronTransOnce() + assert.Equal(t, gid, gid2) if dtmsvr.TransProcessedTestChan != nil && gid != "" { waitTransProcessed(gid) } - return gid } var e2p = dtmimp.E2P @@ -54,20 +54,18 @@ type TransGlobal = dtmsvr.TransGlobal // TransBranch alias type TransBranch = dtmsvr.TransBranch -func cronTransOnceForwardNow(seconds int) string { +func cronTransOnceForwardNow(t *testing.T, gid string, seconds int) { old := dtmsvr.NowForwardDuration dtmsvr.NowForwardDuration = time.Duration(seconds) * time.Second - gid := cronTransOnce() + cronTransOnce(t, gid) dtmsvr.NowForwardDuration = old - return gid } -func cronTransOnceForwardCron(seconds int) string { +func cronTransOnceForwardCron(t *testing.T, gid string, seconds int) { old := dtmsvr.CronForwardDuration dtmsvr.CronForwardDuration = time.Duration(seconds) * time.Second - gid := cronTransOnce() + cronTransOnce(t, gid) dtmsvr.CronForwardDuration = old - return gid } const ( @@ -84,21 +82,21 @@ const ( ) func getBeforeBalances(store string) []int { - b1 := busi.GetBalanceByUid(busi.TransOutUID, store) - b2 := busi.GetBalanceByUid(busi.TransInUID, store) + b1 := busi.GetBalanceByUID(busi.TransOutUID, store) + b2 := busi.GetBalanceByUID(busi.TransInUID, store) return []int{b1, b2} } func assertSameBalance(t *testing.T, before []int, store string) { - b1 := busi.GetBalanceByUid(busi.TransOutUID, store) - b2 := busi.GetBalanceByUid(busi.TransInUID, store) + b1 := busi.GetBalanceByUID(busi.TransOutUID, store) + b2 := busi.GetBalanceByUID(busi.TransInUID, store) assert.Equal(t, before[0], b1) assert.Equal(t, before[1], b2) } func assertNotSameBalance(t *testing.T, before []int, store string) { - b1 := busi.GetBalanceByUid(busi.TransOutUID, store) - b2 := busi.GetBalanceByUid(busi.TransInUID, store) + b1 := busi.GetBalanceByUID(busi.TransOutUID, store) + b2 := busi.GetBalanceByUID(busi.TransInUID, store) assert.NotEqual(t, before[0], b1) assert.Equal(t, before[0]+before[1], b1+b2) } diff --git a/test/xa_cover_test.go b/test/xa_cover_test.go index 0103539ab..673a58bf2 100644 --- a/test/xa_cover_test.go +++ b/test/xa_cover_test.go @@ -20,15 +20,14 @@ func TestXaCoverDBError(t *testing.T) { getXc().Conf.Driver = "no-driver" _, err = xa.CallBranch(req, busi.Busi+"/TransInXa") assert.Error(t, err) - getXc().Conf.Driver = oldDriver // make abort succeed return nil, err }) assert.Error(t, err) - getXc().Conf.Driver = "no-driver" // make xa rollback failed waitTransProcessed(gid) getXc().Conf.Driver = oldDriver - cronTransOnceForwardNow(500) // rollback succeeded here + cronTransOnceForwardNow(t, gid, 500) // rollback succeeded here assert.Equal(t, StatusFailed, getTransStatus(gid)) + assert.Equal(t, []string{StatusSucceed, StatusPrepared}, getBranchesStatus(gid)) } func TestXaCoverDTMError(t *testing.T) { diff --git a/test/xa_test.go b/test/xa_test.go index fcb460c2c..b27634027 100644 --- a/test/xa_test.go +++ b/test/xa_test.go @@ -89,7 +89,7 @@ func TestXaTimeout(t *testing.T) { timeoutChan := make(chan int, 1) err := getXc().XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { go func() { - cronTransOnceForwardNow(300) + cronTransOnceForwardNow(t, gid, 300) timeoutChan <- 0 }() <-timeoutChan @@ -105,10 +105,10 @@ func TestXaNotTimeout(t *testing.T) { timeoutChan := make(chan int, 1) err := getXc().XaGlobalTransaction(gid, func(xa *dtmcli.Xa) (*resty.Response, error) { go func() { - cronTransOnceForwardNow(0) // not timeout, + cronTransOnceForwardNow(t, gid, 0) // not timeout, timeoutChan <- 0 }() - _ = <-timeoutChan + <-timeoutChan req := busi.GenTransReq(30, false, false) _, err := xa.CallBranch(req, busi.Busi+"/TransOutXa") assert.Nil(t, err) @@ -118,8 +118,7 @@ func TestXaNotTimeout(t *testing.T) { assert.Nil(t, err) waitTransProcessed(gid) assert.Equal(t, StatusSubmitted, getTransStatus(gid)) - g := cronTransOnce() - assert.Equal(t, gid, g) + cronTransOnce(t, gid) assert.Equal(t, StatusSucceed, getTransStatus(gid)) assert.Equal(t, []string{StatusPrepared, StatusSucceed}, getBranchesStatus(gid)) }