Skip to content

Commit

Permalink
merged
Browse files Browse the repository at this point in the history
  • Loading branch information
yedf2 committed Jan 8, 2022
2 parents 9bcda55 + b70b188 commit 901decf
Show file tree
Hide file tree
Showing 40 changed files with 131 additions and 95 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,8 @@ jobs:
run: |
go mod download
- name: Run CI lint
run: make lint

- name: Run test cover
run: sh helper/test-cover.sh
run: sh helper/test-cover.sh
6 changes: 3 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
run:
deadline: 5m
# skip-dirs:
# - test
# - examples
skip-dirs:
- test
# - bench

linter-settings:
goconst:
Expand Down
2 changes: 1 addition & 1 deletion bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ usage:
`

func hintAndExit() {
fmt.Printf(usage)
fmt.Print(usage)
os.Exit(0)
}

Expand Down
23 changes: 14 additions & 9 deletions bench/svr/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ func reloadData() {
logger.Debugf("%d users inserted. used: %dms", total, time.Since(began).Milliseconds())
}

var uidCounter int32 = 0
var mode string = ""
var sqls int = 1
var uidCounter int32
var mode string
var sqls = 1

// PrepareBenchDB prepares db data for bench
func PrepareBenchDB() {
db := pdbGet()
_, err := dtmimp.DBExec(db, "drop table if exists dtm_busi.user_account_log")
Expand All @@ -95,7 +96,9 @@ func StartSvr() {
app := dtmutil.GetGinApp()
benchAddRoute(app)
logger.Debugf("bench listening at %d", benchPort)
go app.Run(fmt.Sprintf(":%s", benchPort))
go func() {
_ = app.Run(fmt.Sprintf(":%s", benchPort))
}()
}

func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
Expand All @@ -116,11 +119,13 @@ func qsAdjustBalance(uid int, amount int, c *gin.Context) (interface{}, error) {
if strings.Contains(mode, "barrier") {
barrier, err := dtmcli.BarrierFromQuery(c.Request.URL.Query())
logger.FatalIfError(err)
barrier.Call(txGet(), f)
err = barrier.Call(txGet(), f)
logger.FatalIfError(err)
} else {
tx := txGet()
f(tx)
err := tx.Commit()
err := f(tx)
logger.FatalIfError(err)
err = tx.Commit()
logger.FatalIfError(err)
}

Expand Down Expand Up @@ -158,7 +163,7 @@ func benchAddRoute(app *gin.Engine) {
params2 := fmt.Sprintf("?uid=%s", suid2)
logger.Debugf("mode: %s contains dtm: %t", mode, strings.Contains(mode, "dtm"))
if strings.Contains(mode, "dtm") {
saga := dtmcli.NewSaga(dtmutil.DefaultHttpServer, fmt.Sprintf("bench-%d", uid)).
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, fmt.Sprintf("bench-%d", uid)).
Add(benchBusi+"/TransOut"+params, benchBusi+"/TransOutCompensate"+params, req).
Add(benchBusi+"/TransIn"+params2, benchBusi+"/TransInCompensate"+params2, req)
saga.WaitResult = true
Expand All @@ -175,7 +180,7 @@ func benchAddRoute(app *gin.Engine) {
app.Any(benchAPI+"/benchEmptyUrl", dtmutil.WrapHandler(func(c *gin.Context) (interface{}, error) {
gid := shortuuid.New()
req := gin.H{}
saga := dtmcli.NewSaga(dtmutil.DefaultHttpServer, gid).
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, gid).
Add("", "", req).
Add("", "", req)
saga.WaitResult = true
Expand Down
5 changes: 3 additions & 2 deletions conf.sample.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
### dtm can be run without any config.
### all config in this file is optional. the default value is as specified in each line
### all configs can be specified from env. for example:
### Store.MaxOpenConns can also specified from env: STORE_MAX_OPEN_CONNS
### MicroService.EndPoint => MICRO_SERVICE_END_POINT
#####################################################################

# Store: # specify which engine to store trans status
Expand Down Expand Up @@ -49,4 +49,5 @@
# RetryInterval: 10 # the subtrans branch will be retried after this interval

# LogLevel: 'info' # default: info. can be debug|info|warn|error

# HttpPort: 36789
# GrpcPort: 36790
7 changes: 4 additions & 3 deletions dtmcli/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func BarrierFrom(transType, gid, branchID, op string) (*BranchBarrier, error) {
Op: op,
}
if ti.TransType == "" || ti.Gid == "" || ti.BranchID == "" || ti.Op == "" {
return nil, fmt.Errorf("invlid trans info: %v", ti)
return nil, fmt.Errorf("invalid trans info: %v", ti)
}
return ti, nil
}
Expand All @@ -67,10 +67,10 @@ func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BarrierBusiFunc) (rerr error)
defer func() {
// Logf("barrier call error is %v", rerr)
if x := recover(); x != nil {
tx.Rollback()
_ = tx.Rollback()
panic(x)
} else if rerr != nil {
tx.Rollback()
_ = tx.Rollback()
} else {
rerr = tx.Commit()
}
Expand Down Expand Up @@ -101,6 +101,7 @@ func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BarrierBusiFunc) error
return err
}

// QueryPrepared queries prepared data
func (bb *BranchBarrier) QueryPrepared(db *sql.DB) error {
_, err := insertBarrier(db, bb.TransType, bb.Gid, "00", "msg", "01", "rollback")
var reason string
Expand Down
6 changes: 4 additions & 2 deletions dtmcli/dtmimp/trans_xa_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ func (xc *XaClientBase) HandleCallback(gid string, branchID string, action strin
if err != nil {
return err
}
defer db.Close()
defer func() {
_ = db.Close()
}()
xaID := gid + "-" + branchID
_, err = DBExec(db, GetDBSpecial().GetXaSQL(action, xaID))
if err != nil &&
Expand All @@ -41,7 +43,7 @@ func (xc *XaClientBase) HandleLocalTrans(xa *TransBase, cb func(*sql.DB) error)
if rerr != nil {
return
}
defer func() { db.Close() }()
defer func() { _ = db.Close() }()
defer func() {
x := recover()
_, err := DBExec(db, GetDBSpecial().GetXaSQL("end", xaBranch))
Expand Down
1 change: 1 addition & 0 deletions dtmcli/dtmimp/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type DB interface {
QueryRow(query string, args ...interface{}) *sql.Row
}

// DBConf defines db config
type DBConf struct {
Driver string `yaml:"Driver"`
Host string `yaml:"Host"`
Expand Down
1 change: 1 addition & 0 deletions dtmcli/dtmimp/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ func TestTypes(t *testing.T) {
idGen := BranchIDGen{subBranchID: 99}
idGen.NewSubBranchID()
})
assert.Error(t, err)
}
4 changes: 2 additions & 2 deletions dtmcli/dtmimp/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var ErrFailure = errors.New("FAILURE")
// ErrOngoing error of ONGOING
var ErrOngoing = errors.New("ONGOING")

// XaSqlTimeoutMs milliseconds for Xa sql to timeout
var XaSqlTimeoutMs = 15000
// XaSQLTimeoutMs milliseconds for Xa sql to timeout
var XaSQLTimeoutMs = 15000

// MapSuccess HTTP result of SUCCESS
var MapSuccess = map[string]interface{}{"dtm_result": ResultSuccess}
Expand Down
5 changes: 3 additions & 2 deletions dtmcli/logger/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (

//var logger *zap.SugaredLogger = nil

var logger Logger = nil
var logger Logger

func init() {
InitLog("info")
InitLog(os.Getenv("LOG_LEVEL"))
}

// Logger logger interface
Expand All @@ -24,6 +24,7 @@ type Logger interface {
Errorf(format string, args ...interface{})
}

// WithLogger replaces default logger
func WithLogger(log Logger) {
logger = log
}
Expand Down
5 changes: 3 additions & 2 deletions dtmcli/logger/logger_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package logger

import (
"go.uber.org/zap"
"os"
"testing"

"go.uber.org/zap"
)

func TestInitLog(t *testing.T) {
Expand All @@ -26,4 +27,4 @@ func TestWithLogger(t *testing.T) {
Errorf("a error msg")
FatalfIf(false, "nothing")
FatalIfError(nil)
}
}
14 changes: 8 additions & 6 deletions dtmcli/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type DB = dtmimp.DB
// TransOptions transaction option
type TransOptions = dtmimp.TransOptions

// DBConf declares db configuration
type DBConf = dtmimp.DBConf

// SetCurrentDBType set currentDBType
Expand All @@ -41,16 +42,17 @@ func GetCurrentDBType() string {
return dtmimp.GetCurrentDBType()
}

// SetXaSqlTimeoutMs set XaSqlTimeoutMs
func SetXaSqlTimeoutMs(ms int) {
dtmimp.XaSqlTimeoutMs = ms
// SetXaSQLTimeoutMs set XaSQLTimeoutMs
func SetXaSQLTimeoutMs(ms int) {
dtmimp.XaSQLTimeoutMs = ms
}

// GetXaSqlTimeoutMs get XaSqlTimeoutMs
func GetXaSqlTimeoutMs() int {
return dtmimp.XaSqlTimeoutMs
// GetXaSQLTimeoutMs get XaSQLTimeoutMs
func GetXaSQLTimeoutMs() int {
return dtmimp.XaSQLTimeoutMs
}

// SetBarrierTableName sets barrier table name
func SetBarrierTableName(tablename string) {
dtmimp.BarrierTableName = tablename
}
Expand Down
4 changes: 2 additions & 2 deletions dtmcli/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestTypes(t *testing.T) {
}

func TestXaSqlTimeout(t *testing.T) {
old := GetXaSqlTimeoutMs()
SetXaSqlTimeoutMs(old)
old := GetXaSQLTimeoutMs()
SetXaSQLTimeoutMs(old)
SetBarrierTableName(dtmimp.BarrierTableName) // just cover this func
}
2 changes: 1 addition & 1 deletion dtmcli/xa.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (xc *XaClient) XaGlobalTransaction(gid string, xaFunc XaGlobalFunc) (rerr e
return xc.XaGlobalTransaction2(gid, func(x *Xa) {}, xaFunc)
}

// XaGlobalTransaction start a xa global transaction
// XaGlobalTransaction2 start a xa global transaction
func (xc *XaClient) XaGlobalTransaction2(gid string, custom func(*Xa), xaFunc XaGlobalFunc) (rerr error) {
xa := &Xa{TransBase: *dtmimp.NewTransBase(gid, "xa", xc.XaClientBase.Server, "")}
custom(xa)
Expand Down
6 changes: 3 additions & 3 deletions dtmgrpc/dtmgimp/grpc_clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ func (cb rawCodec) Marshal(v interface{}) ([]byte, error) {
func (cb rawCodec) Unmarshal(data []byte, v interface{}) error {
ba, ok := v.(*[]byte)
dtmimp.PanicIf(!ok, fmt.Errorf("please pass in *[]byte"))
for _, byte := range data {
*ba = append(*ba, byte)
}
*ba = append(*ba, data...)

return nil
}

func (cb rawCodec) Name() string { return "dtm_raw" }

var normalClients, rawClients sync.Map

// ClientInterceptors declares grpc.UnaryClientInterceptors slice
var ClientInterceptors = []grpc.UnaryClientInterceptor{}

// MustGetDtmClient 1
Expand Down
16 changes: 9 additions & 7 deletions dtmgrpc/tcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ func TccFromGrpc(ctx context.Context) (*TccGrpc, error) {
func (t *TccGrpc) CallBranch(busiMsg proto.Message, tryURL string, confirmURL string, cancelURL string, reply interface{}) error {
branchID := t.NewSubBranchID()
bd, err := proto.Marshal(busiMsg)
_, err = dtmgimp.MustGetDtmClient(t.Dtm).RegisterBranch(context.Background(), &dtmgpb.DtmBranchRequest{
Gid: t.Gid,
TransType: t.TransType,
BranchID: branchID,
BusiPayload: bd,
Data: map[string]string{"confirm": confirmURL, "cancel": cancelURL},
})
if err == nil {
_, err = dtmgimp.MustGetDtmClient(t.Dtm).RegisterBranch(context.Background(), &dtmgpb.DtmBranchRequest{
Gid: t.Gid,
TransType: t.TransType,
BranchID: branchID,
BusiPayload: bd,
Data: map[string]string{"confirm": confirmURL, "cancel": cancelURL},
})
}
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions dtmgrpc/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func UseDriver(driverName string) error {
return dtmdriver.Use(driverName)
}

// AddUnaryInterceptor adds grpc.UnaryClientInterceptor
func AddUnaryInterceptor(interceptor grpc.UnaryClientInterceptor) {
dtmgimp.ClientInterceptors = append(dtmgimp.ClientInterceptors, interceptor)
}
18 changes: 13 additions & 5 deletions dtmsvr/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import (
)

const (
// DtmMetricsPort is metric port
DtmMetricsPort = 8889
Mysql = "mysql"
Redis = "redis"
BoltDb = "boltdb"
Postgres = "postgres"
// Mysql is mysql driver
Mysql = "mysql"
// Redis is redis driver
Redis = "redis"
// BoltDb is boltdb driver
BoltDb = "boltdb"
// Postgres is postgres driver
Postgres = "postgres"
)

// MicroService config type for micro service
Expand All @@ -24,6 +29,7 @@ type MicroService struct {
EndPoint string `yaml:"EndPoint"`
}

// Store defines storage relevant info
type Store struct {
Driver string `yaml:"Driver" default:"boltdb"`
Host string `yaml:"Host"`
Expand All @@ -39,10 +45,12 @@ type Store struct {
TransBranchOpTable string `yaml:"BranchTransOpTable" default:"dtm.trans_branch_op"`
}

// IsDB checks config driver is mysql or postgres
func (s *Store) IsDB() bool {
return s.Driver == dtmcli.DBTypeMysql || s.Driver == dtmcli.DBTypePostgres
}

// GetDBConf returns db conf info
func (s *Store) GetDBConf() dtmcli.DBConf {
return dtmcli.DBConf{
Driver: s.Driver,
Expand All @@ -58,7 +66,7 @@ type configType struct {
TransCronInterval int64 `yaml:"TransCronInterval" default:"3"`
TimeoutToFail int64 `yaml:"TimeoutToFail" default:"35"`
RetryInterval int64 `yaml:"RetryInterval" default:"10"`
HttpPort int64 `yaml:"HttpPort" default:"36789"`
HTTPPort int64 `yaml:"HttpPort" default:"36789"`
GrpcPort int64 `yaml:"GrpcPort" default:"36790"`
MicroService MicroService `yaml:"MicroService"`
UpdateBranchSync int64 `yaml:"UpdateBranchSync"`
Expand Down
7 changes: 5 additions & 2 deletions dtmsvr/config/config_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ func loadFromEnvInner(prefix string, conf reflect.Value, defaultValue string) {

func toUnderscoreUpper(key string) string {
key = strings.Trim(key, "_")
matchLastCap := regexp.MustCompile("([A-Z])([A-Z][a-z])")
s2 := matchLastCap.ReplaceAllString(key, "${1}_${2}")

matchFirstCap := regexp.MustCompile("([a-z])([A-Z]+)")
s2 := matchFirstCap.ReplaceAllString(key, "${1}_${2}")
// logger.Debugf("loading from env: %s", strings.ToUpper(s2))
s2 = matchFirstCap.ReplaceAllString(s2, "${1}_${2}")
// logger.Infof("loading from env: %s", strings.ToUpper(s2))
return strings.ToUpper(s2)
}

Expand Down
Loading

0 comments on commit 901decf

Please sign in to comment.