Skip to content

Commit

Permalink
Merge pull request #10 from yannh/support-key-filter
Browse files Browse the repository at this point in the history
Add filter flag
  • Loading branch information
yannh authored Jul 12, 2020
2 parents 0094cd8 + e19c351 commit b0074ee
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 59 deletions.
15 changes: 10 additions & 5 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ Warning: like similar tools, Redis-dump-go does NOT provide Point-in-Time backup
## Run

```
$ redis-dump-go -h
Usage of ./redis-dump-go:
-db int
$ ./bin/redis-dump-go -h
Usage of ./bin/redis-dump-go:
-db uint
only dump this database (default: all databases)
-filter string
key filter to use (default "*")
-host string
Server host (default "127.0.0.1")
-n int
Expand All @@ -34,8 +36,11 @@ Usage of ./redis-dump-go:
-port int
Server port (default 6379)
-s Silent mode (disable progress bar)
$ redis-dump-go > redis-backup.resp
[==================================================] 100% [5/5]
-ttl
Preserve Keys TTL (default true)
$ ./bin/redis-dump-go > dump.resp
Database 0: 9 element dumped
Database 1: 1 element dumped
```

For password-protected Redis servers, set the shell variable REDISDUMPGO_AUTH:
Expand Down
47 changes: 29 additions & 18 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,33 @@ import (
"io"
"log"
"os"
"strings"
"sync"

"github.com/yannh/redis-dump-go/redisdump"
)

func drawProgressBar(to io.Writer, currentPosition, nElements, widgetSize int) {
if nElements == 0 {
return
type progressLogger struct {
stats map[uint8]int
}

func newProgressLogger() *progressLogger {
return &progressLogger{
stats: map[uint8]int{},
}
percent := currentPosition * 100 / nElements
nBars := widgetSize * percent / 100
}

bars := strings.Repeat("=", nBars)
spaces := strings.Repeat(" ", widgetSize-nBars)
fmt.Fprintf(to, "\r[%s%s] %3d%% [%d/%d]", bars, spaces, int(percent), currentPosition, nElements)
func (p *progressLogger) drawProgress(to io.Writer, db uint8, nDumped int) {
if _, ok := p.stats[db]; !ok && len(p.stats) > 0 {
// We switched database, write to a new line
fmt.Fprintf(to, "\n")
}

if currentPosition == nElements {
fmt.Fprint(to, "\n")
p.stats[db] = nDumped
if nDumped == 0 {
return
}

fmt.Fprintf(to, "\rDatabase %d: %d element dumped", db, nDumped)
}

func isFlagPassed(name string) bool {
Expand All @@ -44,7 +51,8 @@ func realMain() int {
// TODO: Number of workers & TTL as parameters
host := flag.String("host", "127.0.0.1", "Server host")
port := flag.Int("port", 6379, "Server port")
db := flag.Int("db", 0, "only dump this database (default: all databases)")
db := flag.Uint("db", 0, "only dump this database (default: all databases)")
filter := flag.String("filter", "*", "key filter to use")
nWorkers := flag.Int("n", 10, "Parallel workers")
withTTL := flag.Bool("ttl", true, "Preserve Keys TTL")
output := flag.String("output", "resp", "Output type - can be resp or commands")
Expand Down Expand Up @@ -76,27 +84,30 @@ func realMain() int {
defer func() {
close(progressNotifs)
wg.Wait()
if !(*silent) {
fmt.Fprint(os.Stderr, "\n")
}
}()

pl := newProgressLogger()
go func() {
for n := range progressNotifs {
if !(*silent) {
drawProgressBar(os.Stderr, n.Done, n.Total, 50)
pl.drawProgress(os.Stderr, n.Db, n.Done)
}
}
wg.Done()
}()

logger := log.New(os.Stdout, "", 0)
if db == nil {
if err = redisdump.DumpServer(*host, *port, redisPassword, *nWorkers, *withTTL, logger, serializer, progressNotifs); err != nil {
fmt.Println(err)
if err = redisdump.DumpServer(*host, *port, redisPassword, *filter, *nWorkers, *withTTL, logger, serializer, progressNotifs); err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
return 1
}
} else {
url := redisdump.RedisURL(*host, fmt.Sprint(*port), fmt.Sprint(*db), redisPassword)
if err = redisdump.DumpDB(url, *nWorkers, *withTTL, logger, serializer, progressNotifs); err != nil {
fmt.Println(err)
if err = redisdump.DumpDB(*host, *port, redisPassword, uint8(*db), *filter, *nWorkers, *withTTL, logger, serializer, progressNotifs); err != nil {
fmt.Fprintf(os.Stderr, "%s", err)
return 1
}
}
Expand Down
59 changes: 23 additions & 36 deletions redisdump/redisdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ import (
radix "github.com/mediocregopher/radix/v3"
)

func min(a, b int) int {
if a <= b {
return a
}
return b
}

func ttlToRedisCmd(k string, val int64) []string {
return []string{"EXPIREAT", k, fmt.Sprint(time.Now().Unix() + val)}
}
Expand Down Expand Up @@ -160,7 +153,8 @@ func dumpKeysWorker(client radix.Client, keyBatches <-chan []string, withTTL boo
// and can be used to provide a progress visualisation such as a progress bar.
// Done is the number of items dumped, Total is the total number of items to dump.
type ProgressNotification struct {
Done, Total int
Db uint8
Done int
}

func parseKeyspaceInfo(keyspaceInfo string) ([]uint8, error) {
Expand Down Expand Up @@ -205,14 +199,9 @@ func getDBIndexes(redisURL string) ([]uint8, error) {
return parseKeyspaceInfo(keyspaceInfo)
}

func scanKeys(client radix.Client, keyBatches chan<- []string, progressNotifications chan<- ProgressNotification) error {
func scanKeys(client radix.Client, db uint8, filter string, keyBatches chan<- []string, progressNotifications chan<- ProgressNotification) error {
keyBatchSize := 100
s := radix.NewScanner(client, radix.ScanOpts{Command: "SCAN", Count: keyBatchSize})

var dbSize int
if err := client.Do(radix.Cmd(&dbSize, "DBSIZE")); err != nil {
return err
}
s := radix.NewScanner(client, radix.ScanOpts{Command: "SCAN", Pattern: filter, Count: keyBatchSize})

nProcessed := 0
var key string
Expand All @@ -223,19 +212,31 @@ func scanKeys(client radix.Client, keyBatches chan<- []string, progressNotificat
nProcessed += len(keyBatch)
keyBatches <- keyBatch
keyBatch = nil
progressNotifications <- ProgressNotification{nProcessed, dbSize}
progressNotifications <- ProgressNotification{Db: db, Done: nProcessed}
}
}

keyBatches <- keyBatch
nProcessed += len(keyBatch)
progressNotifications <- ProgressNotification{nProcessed, dbSize}
progressNotifications <- ProgressNotification{Db: db, Done: nProcessed}

return s.Close()
}

// RedisURL builds a connect URL given a Host, port, db & password
func RedisURL(redisHost string, redisPort string, redisDB string, redisPassword string) string {
switch {
case redisDB == "":
return "redis://:" + redisPassword + "@" + redisHost + ":" + fmt.Sprint(redisPort)
case redisDB != "":
return "redis://:" + redisPassword + "@" + redisHost + ":" + fmt.Sprint(redisPort) + "/" + redisDB
}

return ""
}

// DumpDB dumps all keys from a single Redis DB
func DumpDB(redisURL string, nWorkers int, withTTL bool, logger *log.Logger, serializer func([]string) string, progress chan<- ProgressNotification) error {
func DumpDB(redisHost string, redisPort int, redisPassword string, db uint8, filter string, nWorkers int, withTTL bool, logger *log.Logger, serializer func([]string) string, progress chan<- ProgressNotification) error {
var err error

errors := make(chan error)
Expand All @@ -247,15 +248,13 @@ func DumpDB(redisURL string, nWorkers int, withTTL bool, logger *log.Logger, ser
}
}()

redisURL := RedisURL(redisHost, fmt.Sprint(redisPort), fmt.Sprint(db), redisPassword)
client, err := radix.NewPool("tcp", redisURL, nWorkers)
if err != nil {
return err
}
defer client.Close()

splitURL := strings.Split(redisURL, "/")
db := splitURL[len(splitURL)-1]

if err = client.Do(radix.Cmd(nil, "SELECT", fmt.Sprint(db))); err != nil {
return err
}
Expand All @@ -267,7 +266,7 @@ func DumpDB(redisURL string, nWorkers int, withTTL bool, logger *log.Logger, ser
go dumpKeysWorker(client, keyBatches, withTTL, logger, serializer, errors, done)
}

scanKeys(client, keyBatches, progress)
scanKeys(client, db, filter, keyBatches, progress)
close(keyBatches)

for i := 0; i < nWorkers; i++ {
Expand All @@ -277,30 +276,18 @@ func DumpDB(redisURL string, nWorkers int, withTTL bool, logger *log.Logger, ser
return nil
}

func RedisURL(redisHost string, redisPort string, redisDB string, redisPassword string) string {
switch {
case redisDB == "":
return "redis://:" + redisPassword + "@" + redisHost + ":" + fmt.Sprint(redisPort)
case redisDB != "":
return "redis://:" + redisPassword + "@" + redisHost + ":" + fmt.Sprint(redisPort) + "/" + redisDB
}

return ""
}

// DumpServer dumps all Keys from the redis server given by redisURL,
// to the Logger logger. Progress notification informations
// are regularly sent to the channel progressNotifications
func DumpServer(redisHost string, redisPort int, redisPassword string, nWorkers int, withTTL bool, logger *log.Logger, serializer func([]string) string, progress chan<- ProgressNotification) error {
func DumpServer(redisHost string, redisPort int, redisPassword string, filter string, nWorkers int, withTTL bool, logger *log.Logger, serializer func([]string) string, progress chan<- ProgressNotification) error {
url := RedisURL(redisHost, fmt.Sprint(redisPort), "", redisPassword)
dbs, err := getDBIndexes(url)
if err != nil {
return err
}

for _, db := range dbs {
url = RedisURL(redisHost, fmt.Sprint(redisPort), fmt.Sprint(db), redisPassword)
if err = DumpDB(url, nWorkers, withTTL, logger, serializer, progress); err != nil {
if err = DumpDB(redisHost, redisPort, redisPassword, db, filter, nWorkers, withTTL, logger, serializer, progress); err != nil {
return err
}
}
Expand Down

0 comments on commit b0074ee

Please sign in to comment.