Skip to content

Commit

Permalink
Merge pull request dtm-labs#165 from lsytj0413/refactor-bolt-to-factory
Browse files Browse the repository at this point in the history
refactor(*): migrate boltdb to factory pattern
  • Loading branch information
yedf2 authored Jan 9, 2022
2 parents d389fde + 2221bbf commit 2d5f02f
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 40 deletions.
70 changes: 36 additions & 34 deletions dtmsvr/storage/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,51 @@ package boltdb
import (
"fmt"
"strings"
"sync"
"time"

bolt "go.etcd.io/bbolt"

"github.com/dtm-labs/dtm/dtmcli"
"github.com/dtm-labs/dtm/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/dtmcli/logger"
"github.com/dtm-labs/dtm/dtmsvr/config"
"github.com/dtm-labs/dtm/dtmsvr/storage"
"github.com/dtm-labs/dtm/dtmutil"
)

var conf = &config.Config

// Store implements storage.Store, and storage with boltdb
type Store struct {
boltDb *bolt.DB

dataExpire int64
retryInterval int64
}

var boltDb *bolt.DB
var boltOnce sync.Once
// NewStore will return the boltdb implement
// TODO: change to options
func NewStore(dataExpire int64, retryInterval int64) *Store {
s := &Store{
dataExpire: dataExpire,
retryInterval: retryInterval,
}

func boltGet() *bolt.DB {
boltOnce.Do(func() {
db, err := bolt.Open("./dtm.bolt", 0666, &bolt.Options{Timeout: 1 * time.Second})
dtmimp.E2P(err)
db, err := bolt.Open("./dtm.bolt", 0666, &bolt.Options{Timeout: 1 * time.Second})
dtmimp.E2P(err)

// NOTE: we must ensure all buckets is exists before we use it
err = initializeBuckets(db)
dtmimp.E2P(err)
// NOTE: we must ensure all buckets is exists before we use it
err = initializeBuckets(db)
dtmimp.E2P(err)

// TODO:
// 1. refactor this code
// 2. make cleanup run period, to avoid the file growup when server long-running
err = cleanupExpiredData(
time.Duration(conf.Store.DataExpire)*time.Second,
db,
)
dtmimp.E2P(err)
// TODO:
// 1. refactor this code
// 2. make cleanup run period, to avoid the file growup when server long-running
err = cleanupExpiredData(
time.Duration(dataExpire)*time.Second,
db,
)
dtmimp.E2P(err)

boltDb = db
})
return boltDb
s.boltDb = db
return s
}

func initializeBuckets(db *bolt.DB) error {
Expand Down Expand Up @@ -242,7 +244,7 @@ func (s *Store) Ping() error {
// PopulateData populates data to boltdb
func (s *Store) PopulateData(skipDrop bool) {
if !skipDrop {
err := boltGet().Update(func(t *bolt.Tx) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
dtmimp.E2P(t.DeleteBucket(bucketIndex))
dtmimp.E2P(t.DeleteBucket(bucketBranches))
dtmimp.E2P(t.DeleteBucket(bucketGlobal))
Expand All @@ -262,7 +264,7 @@ func (s *Store) PopulateData(skipDrop bool) {

// FindTransGlobalStore finds GlobalTrans data by gid
func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStore) {
err := boltGet().View(func(t *bolt.Tx) error {
err := s.boltDb.View(func(t *bolt.Tx) error {
trans = tGetGlobal(t, gid)
return nil
})
Expand All @@ -273,7 +275,7 @@ func (s *Store) FindTransGlobalStore(gid string) (trans *storage.TransGlobalStor
// ScanTransGlobalStores lists GlobalTrans data
func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.TransGlobalStore {
globals := []storage.TransGlobalStore{}
err := boltGet().View(func(t *bolt.Tx) error {
err := s.boltDb.View(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketGlobal).Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
if string(k) == *position {
Expand All @@ -300,7 +302,7 @@ func (s *Store) ScanTransGlobalStores(position *string, limit int64) []storage.T
// FindBranches finds Branch data by gid
func (s *Store) FindBranches(gid string) []storage.TransBranchStore {
var branches []storage.TransBranchStore
err := boltGet().View(func(t *bolt.Tx) error {
err := s.boltDb.View(func(t *bolt.Tx) error {
branches = tGetBranches(t, gid)
return nil
})
Expand All @@ -315,7 +317,7 @@ func (s *Store) UpdateBranches(branches []storage.TransBranchStore, updates []st

// LockGlobalSaveBranches creates branches
func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []storage.TransBranchStore, branchStart int) {
err := boltGet().Update(func(t *bolt.Tx) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, gid)
if g == nil {
return storage.ErrNotFound
Expand All @@ -331,7 +333,7 @@ func (s *Store) LockGlobalSaveBranches(gid string, status string, branches []sto

// MaySaveNewTrans creates a new trans
func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []storage.TransBranchStore) error {
return boltGet().Update(func(t *bolt.Tx) error {
return s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g != nil {
return storage.ErrUniqueConflict
Expand All @@ -347,7 +349,7 @@ func (s *Store) MaySaveNewTrans(global *storage.TransGlobalStore, branches []sto
func (s *Store) ChangeGlobalStatus(global *storage.TransGlobalStore, newStatus string, updates []string, finished bool) {
old := global.Status
global.Status = newStatus
err := boltGet().Update(func(t *bolt.Tx) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g == nil || g.Status != old {
return storage.ErrNotFound
Expand All @@ -367,7 +369,7 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval
global.NextCronTime = dtmutil.GetNextTime(nextCronInterval)
global.UpdateTime = dtmutil.GetNextTime(0)
global.NextCronInterval = nextCronInterval
err := boltGet().Update(func(t *bolt.Tx) error {
err := s.boltDb.Update(func(t *bolt.Tx) error {
g := tGetGlobal(t, global.Gid)
if g == nil || g.Gid != global.Gid {
return storage.ErrNotFound
Expand All @@ -384,8 +386,8 @@ func (s *Store) TouchCronTime(global *storage.TransGlobalStore, nextCronInterval
func (s *Store) LockOneGlobalTrans(expireIn time.Duration) *storage.TransGlobalStore {
var trans *storage.TransGlobalStore
min := fmt.Sprintf("%d", time.Now().Add(expireIn).Unix())
next := time.Now().Add(time.Duration(conf.RetryInterval) * time.Second)
err := boltGet().Update(func(t *bolt.Tx) error {
next := time.Now().Add(time.Duration(s.retryInterval) * time.Second)
err := s.boltDb.Update(func(t *bolt.Tx) error {
cursor := t.Bucket(bucketIndex).Cursor()
for trans == nil || trans.Status == dtmcli.StatusSucceed || trans.Status == dtmcli.StatusFailed {
k, v := cursor.First()
Expand Down
25 changes: 25 additions & 0 deletions dtmsvr/storage/registry/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package registry

import (
"sync"

"github.com/dtm-labs/dtm/dtmsvr/storage"
)

// SingletonFactory is the factory to build store in SINGLETON pattern.
type SingletonFactory struct {
once sync.Once

store storage.Store

creatorFunction func() storage.Store
}

// GetStorage implement the StorageFactory.GetStorage
func (f *SingletonFactory) GetStorage() storage.Store {
f.once.Do(func() {
f.store = f.creatorFunction()
})

return f.store
}
34 changes: 28 additions & 6 deletions dtmsvr/storage/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,38 @@ import (

var conf = &config.Config

var stores map[string]storage.Store = map[string]storage.Store{
"redis": &redis.Store{},
"mysql": &sql.Store{},
"postgres": &sql.Store{},
"boltdb": &boltdb.Store{},
// StorageFactory is factory to get storage instance.
type StorageFactory interface {
// GetStorage will return the Storage instance.
GetStorage() storage.Store
}

var storeFactorys = map[string]StorageFactory{
"boltdb": &SingletonFactory{
creatorFunction: func() storage.Store {
return boltdb.NewStore(conf.Store.DataExpire, conf.RetryInterval)
},
},
"redis": &SingletonFactory{
creatorFunction: func() storage.Store {
return &redis.Store{}
},
},
"mysql": &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
},
"postgres": &SingletonFactory{
creatorFunction: func() storage.Store {
return &sql.Store{}
},
},
}

// GetStore returns storage.Store
func GetStore() storage.Store {
return stores[conf.Store.Driver]
return storeFactorys[conf.Store.Driver].GetStorage()
}

// WaitStoreUp wait for db to go up
Expand Down

0 comments on commit 2d5f02f

Please sign in to comment.