Skip to content

Commit

Permalink
Merge pull request dtm-labs#195 from dtm-labs/alpha
Browse files Browse the repository at this point in the history
flash sales bench ok
  • Loading branch information
yedf2 authored Jan 24, 2022
2 parents 435d9b9 + a7055dc commit 56aea32
Show file tree
Hide file tree
Showing 50 changed files with 826 additions and 283 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dist
.vscode
default.etcd
*/**/*.bolt

bench/bench
# Output file of unit test coverage
coverage.*
profile.*
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
run:
deadline: 5m
skip-dirs:
- test
# - test
# - bench

linter-settings:
Expand Down
8 changes: 4 additions & 4 deletions bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ func main() {
logger.Infof("starting bench server")
config.MustLoadConfig("")
logger.InitLog(conf.LogLevel)
if busi.BusiConf.Driver != "" {
dtmcli.SetCurrentDBType(busi.BusiConf.Driver)
svr.PrepareBenchDB()
}
registry.WaitStoreUp()
dtmsvr.PopulateDB(false)
if os.Args[1] == "db" {
if busi.BusiConf.Driver == "mysql" {
dtmcli.SetCurrentDBType(busi.BusiConf.Driver)
svr.PrepareBenchDB()
}
busi.PopulateDB(false)
} else if os.Args[1] == "redis" || os.Args[1] == "boltdb" {

Expand Down
18 changes: 18 additions & 0 deletions bench/svr/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package svr

import (
"context"
"database/sql"
"fmt"
"os"
Expand Down Expand Up @@ -132,6 +133,8 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) error { // nolint: unp
return nil
}

var stockKey = "{a}--stock-1"

func benchAddRoute(app *gin.Engine) {
app.POST(benchAPI+"/TransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return qsAdjustBalance(dtmimp.MustAtoi(c.Query("uid")), 1, c)
Expand Down Expand Up @@ -186,4 +189,19 @@ func benchAddRoute(app *gin.Engine) {
saga.WaitResult = true
return saga.Submit()
}))
app.Any(benchAPI+"/benchFlashSalesReset", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
_, err := busi.RedisGet().FlushAll(context.Background()).Result()
logger.FatalIfError(err)
_, err = busi.RedisGet().Set(context.Background(), stockKey, "0", 86400*time.Second).Result()
logger.FatalIfError(err)
return nil
}))
app.Any(benchAPI+"/benchFlashSales", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
gid := "{a}-" + shortuuid.New()
msg := dtmcli.NewMsg(dtmutil.DefaultHTTPServer, gid).
Add("", nil)
return msg.DoAndSubmit("", func(bb *dtmcli.BranchBarrier) error {
return bb.RedisCheckAdjustAmount(busi.RedisGet(), stockKey, -1, 86400)
})
}))
}
14 changes: 14 additions & 0 deletions bench/test-flash-sales.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# !/bin/bash

set -x

export LOG_LEVEL=fatal
export STORE_DRIVER=redis
export STORE_HOST=localhost
export STORE_PORT=6379
export BUSI_REDIS=localhost:6379
./bench redis &
echo 'sleeping 3s for dtm bench to run up.' && sleep 3
curl "http://127.0.0.1:8083/api/busi_bench/benchFlashSalesReset"
ab -n 300000 -c 20 "http://127.0.0.1:8083/api/busi_bench/benchFlashSales"
pkill bench
42 changes: 0 additions & 42 deletions dtmcli/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/go-redis/redis/v8"
)

// BarrierBusiFunc type for busi func
Expand Down Expand Up @@ -115,44 +114,3 @@ func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
}
return err
}

// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, bb.Op, bb.BarrierID)
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%s-%02d", key, bb.Gid, bb.BranchID, originOp, bb.BarrierID)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
local e1 = redis.call('GET', KEYS[2])
if v == false or v + ARGV[1] < 0 then
return 'FAILURE'
end
if e1 ~= false then
return
end
redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3])
if ARGV[2] ~= '' then
local e2 = redis.call('GET', KEYS[3])
if e2 == false then
redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3])
return
end
end
redis.call('INCRBY', KEYS[1], ARGV[1])
`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}
73 changes: 73 additions & 0 deletions dtmcli/barrier_redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package dtmcli

import (
"fmt"

"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/go-redis/redis/v8"
)

// RedisCheckAdjustAmount check the value of key is valid and >= amount. then adjust the amount
func (bb *BranchBarrier) RedisCheckAdjustAmount(rd *redis.Client, key string, amount int, barrierExpire int) error {
bb.BarrierID = bb.BarrierID + 1
bkey1 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, bb.Op, bb.BarrierID)
originOp := map[string]string{
BranchCancel: BranchTry,
BranchCompensate: BranchAction,
}[bb.Op]
bkey2 := fmt.Sprintf("%s-%s-%s-%02d", bb.Gid, bb.BranchID, originOp, bb.BarrierID)
v, err := rd.Eval(rd.Context(), ` -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
local e1 = redis.call('GET', KEYS[2])
if v == false or v + ARGV[1] < 0 then
return 'FAILURE'
end
if e1 ~= false then
return
end
redis.call('SET', KEYS[2], 'op', 'EX', ARGV[3])
if ARGV[2] ~= '' then
local e2 = redis.call('GET', KEYS[3])
if e2 == false then
redis.call('SET', KEYS[3], 'rollback', 'EX', ARGV[3])
return
end
end
redis.call('INCRBY', KEYS[1], ARGV[1])
`, []string{key, bkey1, bkey2}, amount, originOp, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}

// RedisQueryPrepared query prepared for redis
func (bb *BranchBarrier) RedisQueryPrepared(rd *redis.Client, barrierExpire int) error {
bkey1 := fmt.Sprintf("%s-%s-%s-%s", bb.Gid, "00", "msg", "01")
v, err := rd.Eval(rd.Context(), ` -- RedisQueryPrepared
local v = redis.call('GET', KEYS[1])
if v == false then
redis.call('SET', KEYS[1], 'rollback', 'EX', ARGV[1])
v = 'rollback'
end
if v == 'rollback' then
return 'FAILURE'
end
`, []string{bkey1}, barrierExpire).Result()
logger.Debugf("lua return v: %v err: %v", v, err)
if err == redis.Nil {
err = nil
}
if err == nil && v == ResultFailure {
err = ErrFailure
}
return err
}
6 changes: 3 additions & 3 deletions dtmcli/dtmimp/trans_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func TransRegisterBranch(tb *TransBase, added map[string]string, operation strin
return TransCallDtm(tb, m, operation)
}

// TransRequestBranch TransBAse request branch result
func TransRequestBranch(t *TransBase, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
// TransRequestBranch TransBase request branch result
func TransRequestBranch(t *TransBase, method string, body interface{}, branchID string, op string, url string) (*resty.Response, error) {
resp, err := RestyClient.R().
SetBody(body).
SetQueryParams(map[string]string{
Expand All @@ -118,7 +118,7 @@ func TransRequestBranch(t *TransBase, body interface{}, branchID string, op stri
"op": op,
}).
SetHeaders(t.BranchHeaders).
Post(url)
Execute(method, url)
if err == nil {
err = RespAsErrorCompatible(resp)
}
Expand Down
36 changes: 25 additions & 11 deletions dtmcli/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package dtmcli

import (
"database/sql"
"errors"

"github.com/dtm-labs/dtm/dtmcli/dtmimp"
)
Expand Down Expand Up @@ -40,22 +41,35 @@ func (s *Msg) Submit() error {
return dtmimp.TransCallDtm(&s.TransBase, s, "submit")
}

// PrepareAndSubmit one method for the entire busi->prepare->submit
func (s *Msg) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error {
// DoAndSubmitDB short method for Do on db type. please see Do
func (s *Msg) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall BarrierBusiFunc) error {
return s.DoAndSubmit(queryPrepared, func(bb *BranchBarrier) error {
return bb.CallWithDB(db, busiCall)
})
}

// DoAndSubmit one method for the entire prepare->busi->submit
// if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *Msg) DoAndSubmit(queryPrepared string, busiCall func(bb *BranchBarrier) error) error {
bb, err := BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil {
err = s.Prepare(queryPrepared)
}
if err == nil {
defer func() {
if err != nil && bb.QueryPrepared(db) == ErrFailure {
_ = dtmimp.TransCallDtm(&s.TransBase, s, "abort")
}
}()
err = bb.CallWithDB(db, busiCall)
}
if err == nil {
err = s.Submit()
errb := busiCall(bb)
if errb != nil && !errors.Is(errb, ErrFailure) {
// if busicall return an error other than failure, we will query the result
_, err = dtmimp.TransRequestBranch(&s.TransBase, "GET", nil, bb.BranchID, bb.Op, queryPrepared)
}
if errors.Is(errb, ErrFailure) || errors.Is(err, ErrFailure) {
_ = dtmimp.TransCallDtm(&s.TransBase, s, "abort")
} else if err == nil {
err = s.Submit()
}
if errb != nil {
return errb
}
}
return err
}
2 changes: 1 addition & 1 deletion dtmcli/tcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,5 @@ func (t *Tcc) CallBranch(body interface{}, tryURL string, confirmURL string, can
if err != nil {
return nil, err
}
return dtmimp.TransRequestBranch(&t.TransBase, body, branchID, BranchTry, tryURL)
return dtmimp.TransRequestBranch(&t.TransBase, "POST", body, branchID, BranchTry, tryURL)
}
2 changes: 1 addition & 1 deletion dtmcli/xa.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,5 +101,5 @@ func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc Xa
// CallBranch call a xa branch
func (x *Xa) CallBranch(body interface{}, url string) (*resty.Response, error) {
branchID := x.NewSubBranchID()
return dtmimp.TransRequestBranch(&x.TransBase, body, branchID, BranchAction, url)
return dtmimp.TransRequestBranch(&x.TransBase, "POST", body, branchID, BranchAction, url)
}
14 changes: 14 additions & 0 deletions dtmgrpc/dtmgimp/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (

"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtmdriver"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)

// GrpcServerLog 打印grpc服务端的日志
Expand Down Expand Up @@ -46,3 +49,14 @@ func GrpcClientLog(ctx context.Context, method string, req, reply interface{}, c
}
return err
}

// InvokeBranch invoke a url for trans
func InvokeBranch(t *dtmimp.TransBase, isRaw bool, msg proto.Message, url string, reply interface{}, branchID string, op string) error {
server, method, err := dtmdriver.GetDriver().ParseServerMethod(url)
if err != nil {
return err
}
ctx := TransInfo2Ctx(t.Gid, t.TransType, branchID, op, t.Dtm)
ctx = metadata.AppendToOutgoingContext(ctx, Map2Kvs(t.BranchHeaders)...)
return MustGetGrpcConn(server, isRaw).Invoke(ctx, method, msg, reply)
}
36 changes: 26 additions & 10 deletions dtmgrpc/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package dtmgrpc

import (
"database/sql"
"errors"

"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
Expand Down Expand Up @@ -43,20 +44,35 @@ func (s *MsgGrpc) Submit() error {
return dtmgimp.DtmGrpcCall(&s.TransBase, "Submit")
}

// PrepareAndSubmit one method for the entire busi->prepare->submit
func (s *MsgGrpc) PrepareAndSubmit(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error {
// DoAndSubmitDB short method for Do on db type. please see Do
func (s *MsgGrpc) DoAndSubmitDB(queryPrepared string, db *sql.DB, busiCall dtmcli.BarrierBusiFunc) error {
return s.DoAndSubmit(queryPrepared, func(bb *dtmcli.BranchBarrier) error {
return bb.CallWithDB(db, busiCall)
})
}

// DoAndSubmit one method for the entire prepare->busi->submit
// if busiCall return ErrFailure, then abort is called directly
// if busiCall return not nil error other than ErrFailure, then DoAndSubmit will call queryPrepared to get the result
func (s *MsgGrpc) DoAndSubmit(queryPrepared string, busiCall func(bb *dtmcli.BranchBarrier) error) error {
bb, err := dtmcli.BarrierFrom(s.TransType, s.Gid, "00", "msg") // a special barrier for msg QueryPrepared
if err == nil {
err = bb.CallWithDB(db, func(tx *sql.Tx) error {
err := busiCall(tx)
if err == nil {
err = s.Prepare(queryPrepared)
}
return err
})
err = s.Prepare(queryPrepared)
}
if err == nil {
err = s.Submit()
errb := busiCall(bb)
if errb != nil && !errors.Is(err, dtmcli.ErrFailure) {
err = dtmgimp.InvokeBranch(&s.TransBase, true, nil, queryPrepared, &[]byte{}, bb.BranchID, bb.Op)
err = GrpcError2DtmError(err)
}
if errors.Is(err, dtmcli.ErrFailure) || errors.Is(errb, dtmcli.ErrFailure) {
_ = dtmgimp.DtmGrpcCall(&s.TransBase, "Abort")
} else if err == nil {
err = s.Submit()
}
if errb != nil {
return errb
}
}
return err
}
Loading

0 comments on commit 56aea32

Please sign in to comment.