Skip to content

Commit

Permalink
Merge pull request dtm-labs#174 from dtm-labs/alpha
Browse files Browse the repository at this point in the history
change failure to http 409
  • Loading branch information
yedf2 authored Jan 10, 2022
2 parents 2d5f02f + 8f5ec9a commit f944888
Show file tree
Hide file tree
Showing 29 changed files with 410 additions and 251 deletions.
4 changes: 2 additions & 2 deletions README-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ go run main.go
在实际的业务中,子事务可能出现失败,例如转入的子账号被冻结导致转账失败。我们对业务代码进行修改,让TransIn的正向操作失败,然后看看结果

``` go
app.POST(qsBusiAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return M{"dtm_result": "FAILURE"}, nil
app.POST(qsBusiAPI+"/TransIn", common.WrapHandler2(func(c *gin.Context) interface{} {
return dtmcli.ErrFailure
}))
```

Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ go run main.go
在实际的业务中,子事务可能出现失败,例如转入的子账号被冻结导致转账失败。我们对业务代码进行修改,让TransIn的正向操作失败,然后看看结果

``` go
app.POST(qsBusiAPI+"/TransIn", common.WrapHandler(func(c *gin.Context) (interface{}, error) {
return M{"dtm_result": "FAILURE"}, nil
app.POST(qsBusiAPI+"/TransIn", common.WrapHandler2(func(c *gin.Context) interface{} {
return dtmcli.ErrFailure
}))
```

Expand Down
27 changes: 13 additions & 14 deletions bench/svr/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ func StartSvr() {
}()
}

func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
func qsAdjustBalance(uid int, amount int, c *gin.Context) error { // nolint: unparam
if strings.Contains(mode, "empty") || sqls == 0 {
return dtmcli.MapSuccess, nil
return nil
}
tb := dtmimp.TransBaseFromQuery(c.Request.URL.Query())
f := func(tx *sql.Tx) error {
Expand All @@ -129,32 +129,32 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
logger.FatalIfError(err)
}

return dtmcli.MapSuccess, nil
return nil
}

func benchAddRoute(app *gin.Engine) {
app.POST(benchAPI+"/TransIn", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
app.POST(benchAPI+"/TransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 1, c)
}))
app.POST(benchAPI+"/TransInCompensate", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
app.POST(benchAPI+"/TransInCompensate", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), -1, c)
}))
app.POST(benchAPI+"/TransOut", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
app.POST(benchAPI+"/TransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), -1, c)
}))
app.POST(benchAPI+"/TransOutCompensate", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
app.POST(benchAPI+"/TransOutCompensate", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 30, c)
}))
app.Any(benchAPI+"/reloadData", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
app.Any(benchAPI+"/reloadData", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
reloadData()
mode = c.Query("m")
s := c.Query("sqls")
if s != "" {
sqls = dtmimp.MustAtoi(s)
}
return nil, nil
return nil
}))
app.Any(benchAPI+"/bench", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
app.Any(benchAPI+"/bench", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
uid := (atomic.AddInt32(&uidCounter, 1)-1)%total + 1
suid := fmt.Sprintf("%d", uid)
suid2 := fmt.Sprintf("%d", total+1-uid)
Expand All @@ -175,16 +175,15 @@ func benchAddRoute(app *gin.Engine) {
_, err = dtmimp.RestyClient.R().SetBody(gin.H{}).SetQueryParam("uid", suid).Post(benchBusi + "/TransIn")
dtmimp.E2P(err)
}
return nil, nil
return nil
}))
app.Any(benchAPI+"/benchEmptyUrl", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
app.Any(benchAPI+"/benchEmptyUrl", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
gid := shortuuid.New()
req := gin.H{}
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid).
Add("", "", req).
Add("", "", req)
saga.WaitResult = true
err := saga.Submit()
return nil, err
return saga.Submit()
}))
}
8 changes: 6 additions & 2 deletions dtmcli/dtmimp/trans_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package dtmimp
import (
"errors"
"fmt"
"net/http"
"net/url"
"strings"

Expand Down Expand Up @@ -87,7 +88,7 @@ func TransCallDtm(tb *TransBase, body interface{}, operation string) error {
if err != nil {
return err
}
if !strings.Contains(resp.String(), ResultSuccess) {
if resp.StatusCode() != http.StatusOK || strings.Contains(resp.String(), ResultFailure) {
return errors.New(resp.String())
}
return nil
Expand Down Expand Up @@ -118,5 +119,8 @@ func TransRequestBranch(t *TransBase, body interface{}, branchID string, op stri
}).
SetHeaders(t.BranchHeaders).
Post(url)
return resp, CheckResponse(resp, err)
if err == nil {
err = RespAsErrorCompatible(resp)
}
return resp, err
}
45 changes: 13 additions & 32 deletions dtmcli/dtmimp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"runtime"
"strconv"
Expand Down Expand Up @@ -204,37 +205,17 @@ func GetDsn(conf DBConf) string {
return dsn
}

// CheckResponse is check response, and return corresponding error by the condition of resp when err is nil. Otherwise, return err directly.
func CheckResponse(resp *resty.Response, err error) error {
if err == nil && resp != nil {
if resp.IsError() {
return errors.New(resp.String())
} else if strings.Contains(resp.String(), ResultFailure) {
return ErrFailure
} else if strings.Contains(resp.String(), ResultOngoing) {
return ErrOngoing
}
}
return err
}

// CheckResult is check result. Return err directly if err is not nil. And return corresponding error by calling CheckResponse if resp is the type of *resty.Response.
// Otherwise, return error by value of str, the string after marshal.
func CheckResult(res interface{}, err error) error {
if err != nil {
return err
// RespAsErrorCompatible translate a resty response to error
// compatible with version < v1.10
func RespAsErrorCompatible(resp *resty.Response) error {
code := resp.StatusCode()
str := resp.String()
if code == http.StatusTooEarly || strings.Contains(str, ResultOngoing) {
return fmt.Errorf("%s. %w", str, ErrOngoing)
} else if code == http.StatusConflict || strings.Contains(str, ResultFailure) {
return fmt.Errorf("%s. %w", str, ErrFailure)
} else if code != http.StatusOK {
return errors.New(str)
}
resp, ok := res.(*resty.Response)
if ok {
return CheckResponse(resp, err)
}
if res != nil {
str := MustMarshalString(res)
if strings.Contains(str, ResultFailure) {
return ErrFailure
} else if strings.Contains(str, ResultOngoing) {
return ErrOngoing
}
}
return err
return nil
}
10 changes: 10 additions & 0 deletions dtmcli/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ type TransOptions = dtmimp.TransOptions
// DBConf declares db configuration
type DBConf = dtmimp.DBConf

// String2DtmError translate string to dtm error
func String2DtmError(str string) error {
return map[string]error{
ResultFailure: ErrFailure,
ResultOngoing: ErrOngoing,
ResultSuccess: nil,
"": nil,
}[str]
}

// SetCurrentDBType set currentDBType
func SetCurrentDBType(dbType string) {
dtmimp.SetCurrentDBType(dbType)
Expand Down
4 changes: 2 additions & 2 deletions dtmcli/xa.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func NewXaClient(server string, mysqlConf DBConf, notifyURL string, register XaR
}

// HandleCallback 处理commit/rollback的回调
func (xc *XaClient) HandleCallback(gid string, branchID string, action string) (interface{}, error) {
return MapSuccess, xc.XaClientBase.HandleCallback(gid, branchID, action)
func (xc *XaClient) HandleCallback(gid string, branchID string, action string) interface{} {
return xc.XaClientBase.HandleCallback(gid, branchID, action)
}

// XaLocalTransaction start a xa local transaction
Expand Down
15 changes: 0 additions & 15 deletions dtmgrpc/dtmgimp/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ import (
"fmt"
"time"

"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// GrpcServerLog 打印grpc服务端的日志
Expand Down Expand Up @@ -49,15 +46,3 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c
}
return err
}

// Result2Error 将通用的result转成grpc的error
func Result2Error(res interface{}, err error) error {
e := dtmimp.CheckResult(res, err)
if e == dtmimp.ErrFailure {
logger.Errorf("failure: res: %v, err: %v", res, e)
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
} else if e == dtmimp.ErrOngoing {
return status.New(codes.Aborted, dtmcli.ResultOngoing).Err()
}
return e
}
14 changes: 14 additions & 0 deletions dtmgrpc/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,27 @@ package dtmgrpc
import (
context "context"

"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtmdriver"
grpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)

// DtmError2GrpcError translate dtm error to grpc error
func DtmError2GrpcError(res interface{}) error {
e, ok := res.(error)
if ok && e == dtmimp.ErrFailure {
return status.New(codes.Aborted, dtmcli.ResultFailure).Err()
} else if ok && e == dtmimp.ErrOngoing {
return status.New(codes.FailedPrecondition, dtmcli.ResultOngoing).Err()
}
return e
}

// MustGenGid must gen a gid from grpcServer
func MustGenGid(grpcServer string) string {
dc := dtmgimp.MustGetDtmClient(grpcServer)
Expand Down
27 changes: 14 additions & 13 deletions dtmsvr/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/dtm-labs/dtm/dtmsvr/storage"
)

func svcSubmit(t *TransGlobal) (interface{}, error) {
func svcSubmit(t *TransGlobal) interface{} {
t.Status = dtmcli.StatusSubmitted
branches, err := t.saveNew()

Expand All @@ -25,35 +25,36 @@ func svcSubmit(t *TransGlobal) (interface{}, error) {
dbt.changeStatus(t.Status)
branches = GetStore().FindBranches(t.Gid)
} else if dbt.Status != dtmcli.StatusSubmitted {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot sumbmit", dbt.Status)}, nil
return fmt.Errorf("current status '%s', cannot sumbmit. %w", dbt.Status, dtmcli.ErrFailure)
}
}
return t.Process(branches), nil
return t.Process(branches)
}

func svcPrepare(t *TransGlobal) (interface{}, error) {
func svcPrepare(t *TransGlobal) interface{} {
t.Status = dtmcli.StatusPrepared
_, err := t.saveNew()
if err == storage.ErrUniqueConflict {
dbt := GetTransGlobal(t.Gid)
if dbt.Status != dtmcli.StatusPrepared {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("current status '%s', cannot prepare", dbt.Status)}, nil
return fmt.Errorf("current status '%s', cannot prepare. %w", dbt.Status, dtmcli.ErrFailure)
}
return nil
}
return dtmcli.MapSuccess, nil
return err
}

func svcAbort(t *TransGlobal) (interface{}, error) {
func svcAbort(t *TransGlobal) interface{} {
dbt := GetTransGlobal(t.Gid)
if t.TransType != "xa" && t.TransType != "tcc" || dbt.Status != dtmcli.StatusPrepared && dbt.Status != dtmcli.StatusAborting {
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": fmt.Sprintf("trans type: '%s' current status '%s', cannot abort", dbt.TransType, dbt.Status)}, nil
return fmt.Errorf("trans type: '%s' current status '%s', cannot abort. %w", dbt.TransType, dbt.Status, dtmcli.ErrFailure)
}
dbt.changeStatus(dtmcli.StatusAborting)
branches := GetStore().FindBranches(t.Gid)
return dbt.Process(branches), nil
return dbt.Process(branches)
}

func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) (ret interface{}, rerr error) {
func svcRegisterBranch(transType string, branch *TransBranch, data map[string]string) error {
branches := []TransBranch{*branch, *branch}
if transType == "tcc" {
for i, b := range []string{dtmcli.BranchCancel, dtmcli.BranchConfirm} {
Expand All @@ -66,7 +67,7 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st
branches[1].Op = dtmcli.BranchCommit
branches[1].URL = data["url"]
} else {
return nil, fmt.Errorf("unknow trans type: %s", transType)
return fmt.Errorf("unknow trans type: %s", transType)
}

err := dtmimp.CatchP(func() {
Expand All @@ -75,9 +76,9 @@ func svcRegisterBranch(transType string, branch *TransBranch, data map[string]st
if err == storage.ErrNotFound {
msg := fmt.Sprintf("no trans with gid: %s status: %s found", branch.Gid, dtmcli.StatusPrepared)
logger.Errorf(msg)
return map[string]interface{}{"dtm_result": dtmcli.ResultFailure, "message": msg}, nil
return fmt.Errorf("message: %s %w", msg, dtmcli.ErrFailure)
}
logger.Infof("LockGlobalSaveBranches result: %v: gid: %s old status: %s branches: %s",
err, branch.Gid, dtmcli.StatusPrepared, dtmimp.MustMarshalString(branches))
return dtmimp.If(err != nil, nil, dtmcli.MapSuccess), err
return err
}
18 changes: 9 additions & 9 deletions dtmsvr/api_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"context"

"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/dtmgrpc"
pb "github.com/dtm-labs/dtm/dtmgrpc/dtmgpb"
"google.golang.org/protobuf/types/known/emptypb"
)
Expand All @@ -25,26 +25,26 @@ func (s *dtmServer) NewGid(ctx context.Context, in *emptypb.Empty) (*pb.DtmGidRe
}

func (s *dtmServer) Submit(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
r, err := svcSubmit(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgimp.Result2Error(r, err)
r := svcSubmit(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(r)
}

func (s *dtmServer) Prepare(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
r, err := svcPrepare(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgimp.Result2Error(r, err)
r := svcPrepare(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(r)
}

func (s *dtmServer) Abort(ctx context.Context, in *pb.DtmRequest) (*emptypb.Empty, error) {
r, err := svcAbort(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgimp.Result2Error(r, err)
r := svcAbort(TransFromDtmRequest(ctx, in))
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(r)
}

func (s *dtmServer) RegisterBranch(ctx context.Context, in *pb.DtmBranchRequest) (*emptypb.Empty, error) {
r, err := svcRegisterBranch(in.TransType, &TransBranch{
r := svcRegisterBranch(in.TransType, &TransBranch{
Gid: in.Gid,
BranchID: in.BranchID,
Status: dtmcli.StatusPrepared,
BinData: in.BusiPayload,
}, in.Data)
return &emptypb.Empty{}, dtmgimp.Result2Error(r, err)
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(r)
}
Loading

0 comments on commit f944888

Please sign in to comment.