Skip to content

Commit

Permalink
Move runVlogGC to x and use it in zero as well (hypermodeinc#4716)
Browse files Browse the repository at this point in the history
  • Loading branch information
animesh2049 authored Feb 6, 2020
1 parent 68ab4c4 commit 55b1693
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 37 deletions.
4 changes: 4 additions & 0 deletions dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ func run() {
x.Checkf(err, "Error while opening WAL store")
defer kv.Close()

gcCloser := y.NewCloser(1) // closer for vLogGC
go x.RunVlogGC(kv, gcCloser)
defer gcCloser.SignalAndWait()

// zero out from memory
kvOpt.EncryptionKey = nil

Expand Down
43 changes: 6 additions & 37 deletions worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
Expand All @@ -37,9 +38,7 @@ type ServerState struct {

Pstore *badger.DB
WALstore *badger.DB

vlogTicker *time.Ticker // runs every 1m, check size of vlog and run GC conditionally.
mandatoryVlogTicker *time.Ticker // runs every 10m, we always run vlog GC.
gcCloser *y.Closer // closer for valueLogGC

needTs chan tsReq
}
Expand All @@ -66,34 +65,6 @@ func InitServerState() {
x.WorkerConfig.ProposedGroupId = groupId
}

func (s *ServerState) runVlogGC(store *badger.DB) {
// Get initial size on start.
_, lastVlogSize := store.Size()
const GB = int64(1 << 30)

runGC := func() {
var err error
for err == nil {
// If a GC is successful, immediately run it again.
err = store.RunValueLogGC(0.7)
}
_, lastVlogSize = store.Size()
}

for {
select {
case <-s.vlogTicker.C:
_, currentVlogSize := store.Size()
if currentVlogSize < lastVlogSize+GB {
continue
}
runGC()
case <-s.mandatoryVlogTicker.C:
runGC()
}
}
}

func setBadgerOptions(opt badger.Options) badger.Options {
opt = opt.WithSyncWrites(false).WithTruncate(true).WithLogger(&x.ToGlog{}).
WithEncryptionKey(enc.ReadEncryptionKeyFile(Config.BadgerKeyFile))
Expand Down Expand Up @@ -189,22 +160,20 @@ func (s *ServerState) initStorage() {
opt.EncryptionKey = nil
}

s.vlogTicker = time.NewTicker(1 * time.Minute)
s.mandatoryVlogTicker = time.NewTicker(10 * time.Minute)
go s.runVlogGC(s.Pstore)
go s.runVlogGC(s.WALstore)
s.gcCloser = y.NewCloser(2)
go x.RunVlogGC(s.Pstore, s.gcCloser)
go x.RunVlogGC(s.WALstore, s.gcCloser)
}

// Dispose stops and closes all the resources inside the server state.
func (s *ServerState) Dispose() {
s.gcCloser.SignalAndWait()
if err := s.Pstore.Close(); err != nil {
glog.Errorf("Error while closing postings store: %v", err)
}
if err := s.WALstore.Close(); err != nil {
glog.Errorf("Error while closing WAL store: %v", err)
}
s.vlogTicker.Stop()
s.mandatoryVlogTicker.Stop()
}

func (s *ServerState) GetTimestamp(readOnly bool) uint64 {
Expand Down
41 changes: 41 additions & 0 deletions x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"syscall"
"time"

"github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"

Expand Down Expand Up @@ -815,3 +817,42 @@ func IsGuardian(groups []string) bool {

return false
}

// RunVlogGC runs value log gc on store. It runs GC unconditionally after every 10 minutes.
// Additionally it also runs GC if vLogSize has grown more than 1 GB in last minute.
func RunVlogGC(store *badger.DB, closer *y.Closer) {
defer closer.Done()
// Get initial size on start.
_, lastVlogSize := store.Size()
const GB = int64(1 << 30)

// Runs every 1m, checks size of vlog and runs GC conditionally.
vlogTicker := time.NewTicker(1 * time.Minute)
defer vlogTicker.Stop()
// Runs vlog GC unconditionally every 10 minutes.
mandatoryVlogTicker := time.NewTicker(10 * time.Minute)
defer mandatoryVlogTicker.Stop()

runGC := func() {
for err := error(nil); err == nil; {
// If a GC is successful, immediately run it again.
err = store.RunValueLogGC(0.7)
}
_, lastVlogSize = store.Size()
}

for {
select {
case <-closer.HasBeenClosed():
return
case <-vlogTicker.C:
_, currentVlogSize := store.Size()
if currentVlogSize < lastVlogSize+GB {
continue
}
runGC()
case <-mandatoryVlogTicker.C:
runGC()
}
}
}

0 comments on commit 55b1693

Please sign in to comment.