Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add subcommands #912

Open
wants to merge 2 commits into
base: v4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
381 changes: 381 additions & 0 deletions cmd/commands/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,381 @@
package commands

import (
"RedisShake/internal/client"
"RedisShake/internal/config"
"RedisShake/internal/reader"
"RedisShake/internal/writer"
"bytes"
"container/heap"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"os"
"strings"
)

var subCommands = []string{"aof_reader", "rdb_reader", "scan_reader", "sync_reader",
"sync_reader.sentinel", "redis_writer", "redis_writer.sentinel",
"filter", "advanced", "module"}

var buildFuncs = []func() (*viper.Viper, *cobra.Command){
buildCommandAofReader,
buildCommandRedisWriterSentinel,
buildCommandRedisWriter,
buildCommandSyncReaderSentinel,
buildCommandRdbReader,
buildCommandSyncReader,
buildCommandScanReader,
buildCommandAdvanced,
buildCommandFilter,
buildCommandModule,
}

func main() {
if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help" || os.Args[1] == "help") {
ConvertArgs2Toml(true, false)
}
commandLine := strings.Join(os.Args[1:], ",")
if strings.Contains(commandLine, "reader") || strings.Contains(commandLine, "writer") ||
strings.Contains(commandLine, "filter") || strings.Contains(commandLine, "advanced") ||
strings.Contains(commandLine, "module") {
ConvertArgs2Toml(false, strings.Contains(commandLine, "--dry-run"))
}
}

func ConvertArgs2Toml(showHelp bool, dryRun bool) (string, error) {
var rootCmd = &cobra.Command{
Use: "redis-shake",
Long: `redis-shake [command_reader][flags] [command_writer][flags] [filter][flags] [advanced][flags] [module]
command_reader: aof_reader, rdb_reader, scan_reader, sync_reader, sync_reader.sentinel
command_writer: redis_writer, redis_writer.sentinel`,
}

viperMap := make(map[string]*viper.Viper)
commandMap := make(map[string]*cobra.Command)
for _, buildFunc := range buildFuncs {
vp, cmd := buildFunc()
viperMap[cmd.Use] = vp
commandMap[cmd.Use] = cmd
}

if showHelp {
rootCmd.Execute()
for _, subCmd := range subCommands {
commandMap[subCmd].Execute()
}
return "", nil
}
subCommandArgIndex := &IntPairHeap{}
for argI, arg := range os.Args {
for cmdI, cmd := range subCommands {
if arg == cmd {
heap.Push(subCommandArgIndex, Pair{cmdI, argI})
}
}
}
// eg:redis-shake redis_writer -a 127.0.0.1:6379 aof_reader -f appendonly.aof
// cmdIndex 5 0
// argIndex 0 1 2 3 4 5 6
// redis_writer 需要传入argIndex[1:4],aof_reader需要传入参数argIndex[4:7]
heap.Push(subCommandArgIndex, Pair{len(subCommands), len(os.Args)})
prePair := Pair{}
heapSize := subCommandArgIndex.Len()
for i := 0; i < heapSize; i++ {
pair := heap.Pop(subCommandArgIndex).(Pair)
argIndex := pair.Right
if argIndex == 0 {
continue
}
preArgIndex := prePair.Right
if preArgIndex != 0 && argIndex != 0 {
preCmdIndex := prePair.Left
command := commandMap[subCommands[preCmdIndex]]
command.SetArgs(os.Args[preArgIndex:argIndex])
err := command.Execute()
if err != nil {
return "", err
}
}
prePair = pair
}

description := "#this config file is generated by command: " + strings.Join(os.Args, " ")
toml ,err:= viperMap2Toml(viperMap, description)
if err != nil {
return "", err
}
if dryRun {
fmt.Println(string(toml))
return "", nil
}
homeDir, err := os.UserHomeDir()
if err != nil {
panic(any(err))
}
shakeHomeDir := homeDir + "/.redis-shake/cache/"
os.MkdirAll(shakeHomeDir, os.ModePerm)
filePath := shakeHomeDir + uuid.New().String() + ".toml"
fmt.Println("generate toml config file: " + filePath)
os.WriteFile(filePath, toml, 0644)
return filePath, nil
}

func viperMap2Toml(viperMap map[string]*viper.Viper, description string) ([]byte,error) {
var buffer bytes.Buffer
buffer.WriteString(description + "\n")
isAllConfigEmpty := true
for key, vp := range viperMap {
settings := vp.AllSettings()
if len(settings) == 0 {
continue
}
if isAllConfigEmpty {
isAllConfigEmpty = false
}
buffer.WriteString("[" + key + "]\n")
for k, v := range settings {
buffer.WriteString(fmt.Sprintf("%s = %#v\n", k, v))
}
buffer.WriteString("\n")
}
if isAllConfigEmpty{
return []byte{},errors.New("all config empty")
}
return buffer.Bytes(),nil
}

func buildCommandRedisWriterSentinel() (*viper.Viper, *cobra.Command) {
return buildCommandSentinel("redis_writer.sentinel")
}
func buildCommandSyncReaderSentinel() (*viper.Viper, *cobra.Command) {
return buildCommandSentinel("sync_reader.sentinel")
}
func buildCommandSentinel(commandName string) (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &client.SentinelOptions{}
bindKeys := []string{"cluster", "address", "username", "password", "tls", "off_reply"}
command := NewCommand(commandName, bindKeys, vp, opts)
flags := command.Flags()
flags.StringVarP(&opts.MasterName, "master_name", "m", "", "")
flags.StringVarP(&opts.Address, "address", "a", "", "[required]eg: 127.0.0.1:6379")
flags.StringVarP(&opts.Username, "username", "u", "", "")
flags.StringVarP(&opts.Password, "password", "p", "", "")
flags.BoolVarP(&opts.Tls, "tls", "t", false, "")
command.MarkFlagRequired("address")
command.MarkFlagsRequiredTogether("username", "password")
return vp, command
}

func buildCommandAofReader() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &reader.AOFReaderOptions{}
bindKeys := []string{"filepath", "timestamp"}
command := NewCommand("aof_reader", bindKeys, vp, opts)
flags := command.Flags()
flags.StringVarP(&opts.Filepath, "filepath", "f", "/tmp/.aof", "[required]")
flags.Int64VarP(&opts.AOFTimestamp, "timestamp", "a", 0, "# subsecond")
command.MarkFlagRequired("filepath")
return vp, command
}

func buildCommandRdbReader() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &reader.RdbReaderOptions{}
bindKeys := []string{"filepath"}
command := NewCommand("rdb_reader", bindKeys, vp, opts)
flags := command.Flags()
flags.StringVarP(&opts.Filepath, "filepath", "f", "/tmp/dump.rdb", "[required]")
command.MarkFlagRequired("filepath")
return vp, command
}

func buildCommandSyncReader() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &reader.SyncReaderOptions{}
bindKeys := []string{"cluster", "address", "username", "password", "tls", "sync_rdb", "sync_aof", "prefer_replica", "try_diskless"}
command := NewCommand("sync_reader", bindKeys, vp, opts)
flags := command.Flags()
flags.BoolVarP(&opts.Cluster, "cluster", "c", false, "# Set to true if the source is a Redis cluster")
flags.StringVarP(&opts.Address, "address", "a", "127.0.0.1:6379", "# [required]For clusters, specify the address of any cluster node; use the master or slave address in master-slave mode")
flags.StringVarP(&opts.Username, "username", "u", "", "# Keep empty if ACL is not in use")
flags.StringVarP(&opts.Password, "password", "p", "", "# Keep empty if no authentication is required")
flags.BoolVarP(&opts.Tls, "tls", "t", false, "# Set to true to enable TLS if needed")
flags.BoolVarP(&opts.SyncRdb, "sync_rdb", "d", true, "# Set to false if RDB synchronization is not required")
flags.BoolVarP(&opts.SyncAof, "sync_aof", "o", true, "# Set to false if AOF synchronization is not required")
flags.BoolVarP(&opts.PreferReplica, "prefer_replica", "r", false, "# Set to true to sync from a replica node")
flags.BoolVarP(&opts.TryDiskless, "try_diskless", "l", false, "# Set to true for diskless sync if the source has repl-diskless-sync=yes")
command.MarkFlagRequired("address")
command.MarkFlagsRequiredTogether("username", "password")
return vp, command
}

func buildCommandScanReader() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &reader.ScanReaderOptions{}
bindKeys := []string{"cluster", "address", "username", "password", "tls", "scan", "ksn", "dbs", "prefer_replica", "count"}
command := NewCommand("scan_reader", bindKeys, vp, opts)
flags := command.Flags()
flags.BoolVarP(&opts.Cluster, "cluster", "c", false, "# set to true if source is a redis cluster")
flags.StringVarP(&opts.Address, "address", "a", "127.0.0.1:6379", "# [required] when cluster is true, set address to one of the cluster node")
flags.StringVarP(&opts.Username, "username", "u", "", "# keep empty if not using ACL")
flags.StringVarP(&opts.Password, "password", "p", "", "# keep empty if no authentication is required")
flags.BoolVarP(&opts.Tls, "tls", "t", false, "")
flags.IntSliceVarP(&opts.DBS, "dbs", "d", []int{}, "# set you want to scan dbs such as [1,5,7], if you don't want to scan all")
flags.BoolVarP(&opts.Scan, "scan", "s", true, "# set to false if you don't want to scan keys")
flags.BoolVarP(&opts.KSN, "ksn", "k", false, "# set to true to enabled Redis keyspace notifications (KSN) subscription")
flags.IntVarP(&opts.Count, "count", "n", 1, "# number of keys to scan per iteration")
flags.BoolVarP(&opts.PreferReplica, "prefer_replica", "r", false, "")
command.MarkFlagRequired("address")
command.MarkFlagsRequiredTogether("username", "password")
return vp, command
}

func buildCommandRedisWriter() (*viper.Viper, *cobra.Command) {
opts := &writer.RedisWriterOptions{}
vp := viper.New()
bindKeys := []string{"cluster", "address", "username", "password", "tls", "off_reply"}
command := NewCommand("redis_writer", bindKeys, vp, opts)
flags := command.Flags()
flags.BoolVarP(&opts.Cluster, "cluster", "c", false, "# set to true if target is a redis cluster")
flags.StringVarP(&opts.Address, "address", "a", "127.0.0.1:6380", "# when cluster is true, set address to one of the cluster node")
flags.StringVarP(&opts.Username, "username", "u", "", "# keep empty if not using ACL")
flags.StringVarP(&opts.Password, "password", "p", "", "# keep empty if no authentication is required")
flags.BoolVarP(&opts.Tls, "tls", "t", false, "# turn off the server reply")
flags.BoolVarP(&opts.OffReply, "off_reply", "o", false, "# turn off the server reply")
command.MarkFlagsRequiredTogether("username", "password")
return vp, command
}

func buildCommandFilter() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &config.FilterOptions{}
bindKeys := []string{"allow_keys", "allow_key_prefix", "allow_key_suffix", "block_keys",
"block_key_prefix", "block_key_suffix", "allow_key_regex", "block_key_regex",
"allow_db", "block_db", "allow_command", "block_command",
"allow_command_group", "block_command_group", "function"}
command := NewCommand("filter", bindKeys, vp, opts)
flags := command.Flags()
flags.StringSliceVarP(&opts.AllowKeys, "allow_keys", "a", []string{},
`# Allow keys with specific prefixes or suffixes
# Examples:
# allow_keys = ["user:1001", "product:2001"]
# allow_key_prefix = ["user:", "product:"]
# allow_key_suffix = [":active", ":valid"]
# allow A collection of keys containing 11-digit mobile phone numbers
# allow_key_regex = [":\\d{11}:"]
# Leave empty to allow all keys`)
flags.StringSliceVarP(&opts.AllowKeyPrefix, "allow_key_prefix", "p", []string{}, "")
flags.StringSliceVarP(&opts.AllowKeySuffix, "allow_key_suffix", "s", []string{}, "")
flags.StringSliceVarP(&opts.AllowKeyRegex, "allow_key_regex", "e", []string{}, "")
flags.StringSliceVarP(&opts.BlockKeys, "block_keys", "k", []string{},
`# Block keys with specific prefixes or suffixes
# Examples:
# block_keys = ["temp:1001", "cache:2001"]
# block_key_prefix = ["temp:", "cache:"]
# block_key_suffix = [":tmp", ":old"]
# block test 11-digit mobile phone numbers keys
# block_key_regex = [":test:\\d{11}:"]
# Leave empty to block nothing`)
flags.StringSliceVarP(&opts.BlockKeyPrefix, "block_key_prefix", "r", []string{}, "")
flags.StringSliceVarP(&opts.BlockKeySuffix, "block_key_suffix", "i", []string{}, "")
flags.StringSliceVarP(&opts.BlockKeyRegex, "block_key_regex", "x", []string{}, "")
flags.IntSliceVarP(&opts.AllowDB, "allow_db", "w", []int{},
`# Specify allowed and blocked database numbers (e.g., allow_db = [0, 1, 2], block_db = [3, 4, 5])
# Leave empty to allow all databases`)
flags.IntSliceVarP(&opts.BlockDB, "block_db", "o", []int{}, "")
flags.StringSliceVarP(&opts.AllowCommand, "allow_command", "m", []string{},
`# Allow or block specific commands
# Examples:
# allow_command = ["GET", "SET"] # Only allow GET and SET commands
# block_command = ["DEL", "FLUSHDB"] # Block DEL and FLUSHDB commands
# Leave empty to allow all commands`)
flags.StringSliceVarP(&opts.BlockCommand, "block_command", "l", []string{}, "")
flags.StringSliceVarP(&opts.AllowCommandGroup, "allow_command_group", "g", []string{},
`# Allow or block specific command groups
# Available groups:
# SERVER, STRING, CLUSTER, CONNECTION, BITMAP, LIST, SORTED_SET,
# GENERIC, TRANSACTIONS, SCRIPTING, TAIRHASH, TAIRSTRING, TAIRZSET,
# GEO, HASH, HYPERLOGLOG, PUBSUB, SET, SENTINEL, STREAM
# Examples:
# allow_command_group = ["STRING", "HASH"] # Only allow STRING and HASH commands
# block_command_group = ["SCRIPTING", "PUBSUB"] # Block SCRIPTING and PUBSUB commands
# Leave empty to allow all command groups`)
flags.StringSliceVarP(&opts.BlockCommandGroup, "block_command_group", "u", []string{}, "")
flags.StringVarP(&opts.Function, "function", "f", "",
`# Function for custom data processing
# For best practices and examples, visit:
# https://tair-opensource.github.io/RedisShake/zh/filter/function.html`)
return vp, command
}

func buildCommandAdvanced() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &config.AdvancedOptions{}
bindKeys := []string{"dir", "ncpu", "pprof_port", "status_port",
"log_file", "log_level", "log_interval", "rdb_restore_command_behavior",
"pipeline_count_limit", "target_redis_client_max_querybuf_len", "target_redis_proto_max_bulk_len", "aws_psync",
"empty_db_before_sync"}
command := NewCommand("advanced", bindKeys, vp, opts)
flags := command.Flags()
flags.StringVarP(&opts.Dir, "dir", "d", "data", "")
flags.IntVarP(&opts.Ncpu, "ncpu", "a", 0, "# runtime.GOMAXPROCS, 0 means use runtime.NumCPU() cpu cores")
flags.IntVarP(&opts.PprofPort, "pprof_port", "f", 0, "# pprof port, 0 means disable")
flags.IntVarP(&opts.StatusPort, "status_port", "t", 0, "# status port, 0 means disable")
flags.StringVarP(&opts.LogFile, "log_file", "l", "shake.log", "")
flags.StringVarP(&opts.LogLevel, "log_level", "e", "info", "# debug, info or warn")
flags.IntVarP(&opts.LogInterval, "log_interval", "i", 5, "# in seconds")
flags.StringVarP(&opts.RDBRestoreCommandBehavior, "rdb_restore_command_behavior", "b", "panic",
`# panic, rewrite or skip
# redis-shake gets key and value from rdb file, and uses RESTORE command to
# create the key in target redis. Redis RESTORE will return a "Target key name
# is busy" error when key already exists. You can use this configuration item
# to change the default behavior of restore:
# panic: redis-shake will stop when meet "Target key name is busy" error.
# rewrite: redis-shake will replace the key with new value.
# skip: redis-shake will skip restore the key when meet "Target key name is busy" error.`)
flags.Uint64VarP(&opts.PipelineCountLimit, "target_redis_client_max_querybuf_len", "q", 1073741824,
`# This setting corresponds to the 'client-query-buffer-limit' in Redis configuration.
# The default value is typically 1GB.
# It's recommended not to modify this value unless absolutely necessary.`)
flags.Int64VarP(&opts.TargetRedisClientMaxQuerybufLen, "target_redis_proto_max_bulk_len", "x", 512_000_000,
`# This setting corresponds to the 'proto-max-bulk-len' in Redis configuration.
# It defines the maximum size of a single string element in the Redis protocol.
# The value must be 1MB or greater. Default is 512MB.
# It's recommended not to modify this value unless absolutely necessary.`)
flags.StringVarP(&opts.AwsPSync, "aws_psync", "w", "",
`# If the source is Elasticache, you can set this item. AWS ElastiCache has custom
# psync command, which can be obtained through a ticket.`)
flags.BoolVarP(&opts.EmptyDBBeforeSync, "empty_db_before_sync", "y", false,
`# destination will delete itself entire database before fetching files
# from source during full synchronization.
# This option is similar redis replicas RDB diskless load option:
# repl-diskless-load on-empty-db`)
return vp, command
}
func buildCommandModule() (*viper.Viper, *cobra.Command) {
vp := viper.New()
var opts = &config.ModuleOptions{}
bindKeys := []string{"target_mbbloom_version"}
command := NewCommand("module", bindKeys, vp, opts)
flags := command.Flags()
flags.IntVarP(&opts.TargetMBbloomVersion, "target_mbbloom_version", "v", 0, "# The data format for BF.LOADCHUNK is not compatible in different versions. v2.6.3 <=> 20603")
return vp, command
}

func NewCommand(commandName string, bindKeys []string, vp *viper.Viper, opts interface{}) *cobra.Command {
command := &cobra.Command{
Use: commandName,
Long: "\n------redis-shake " + commandName + " (description)------",
PreRun: func(cmd *cobra.Command, args []string) {
for _, name := range bindKeys {
vp.BindPFlag(name, cmd.Flags().Lookup(name))
}
vp.Unmarshal(opts)
},
Run: func(cmd *cobra.Command, args []string) {},
}
return command
}
Loading