Skip to content

Commit

Permalink
cmd/backups: minor cleanups, mostly progressbar
Browse files Browse the repository at this point in the history
  • Loading branch information
jzelinskie committed Dec 14, 2023
1 parent 099fa38 commit 0bb1a27
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 121 deletions.
243 changes: 133 additions & 110 deletions internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,54 @@ import (
"github.com/spf13/cobra"

"github.com/authzed/zed/internal/client"
"github.com/authzed/zed/internal/commands"
"github.com/authzed/zed/pkg/backupformat"
)

var backupCmd = &cobra.Command{
Use: "backup <filename>",
Short: "Create, restore, and inspect Permissions System backups",
Args: cobra.ExactArgs(1),
// Create used to be on the root, so add it here for back-compat.
RunE: backupCreateCmdFunc,
}
var (
backupCmd = &cobra.Command{
Use: "backup <filename>",
Short: "Create, restore, and inspect Permissions System backups",
Args: cobra.ExactArgs(1),
// Create used to be on the root, so add it here for back-compat.
RunE: backupCreateCmdFunc,
}

backupCreateCmd = &cobra.Command{
Use: "create <filename>",
Short: "Backup a permission system to a file",
Args: cobra.ExactArgs(1),
RunE: backupCreateCmdFunc,
}

backupRestoreCmd = &cobra.Command{
Use: "restore <filename>",
Short: "Restore a permission system from a file",
Args: commands.StdinOrExactArgs(1),
RunE: restoreCmdFunc,
}

backupParseSchemaCmd = &cobra.Command{
Use: "parse-schema <filename>",
Short: "Extract the schema from a backup file",
Args: cobra.ExactArgs(1),
RunE: backupParseSchemaCmdFunc,
}

backupParseRevisionCmd = &cobra.Command{
Use: "parse-revision <filename>",
Short: "Extract the revision from a backup file",
Args: cobra.ExactArgs(1),
RunE: backupParseRevisionCmdFunc,
}

backupParseRelsCmd = &cobra.Command{
Use: "parse-relationships <filename>",
Short: "Extract the relationships from a backup file",
Args: cobra.ExactArgs(1),
RunE: backupParseRelsCmdFunc,
}
)

func registerBackupCmd(rootCmd *cobra.Command) {
rootCmd.AddCommand(backupCmd)
Expand All @@ -42,7 +80,7 @@ func registerBackupCmd(rootCmd *cobra.Command) {

backupCmd.AddCommand(backupRestoreCmd)
backupRestoreCmd.Flags().Int("batch-size", 1_000, "restore relationship write batch size")
backupRestoreCmd.Flags().Int("batches-per-transaction", 10, "number of batches per transaction")
backupRestoreCmd.Flags().Int64("batches-per-transaction", 10, "number of batches per transaction")
backupRestoreCmd.Flags().String("prefix-filter", "", "include only schema and relationships with a given prefix")
backupRestoreCmd.Flags().Bool("rewrite-legacy", false, "potentially modify the schema to exclude legacy/broken syntax")

Expand All @@ -64,13 +102,6 @@ func registerBackupCmd(rootCmd *cobra.Command) {
backupParseRelsCmd.Flags().String("prefix-filter", "", "Include only relationships with a given prefix")
}

var backupCreateCmd = &cobra.Command{
Use: "create <filename>",
Short: "Backup a permission system to a file",
Args: cobra.ExactArgs(1),
RunE: backupCreateCmdFunc,
}

func createBackupFile(filename string) (*os.File, error) {
if filename == "-" {
log.Trace().Str("filename", "- (stdout)").Send()
Expand Down Expand Up @@ -152,11 +183,37 @@ func hasRelPrefix(rel *v1.Relationship, prefix string) bool {
strings.HasPrefix(rel.Subject.Object.ObjectType, prefix)
}

func backupCreateCmdFunc(cmd *cobra.Command, args []string) error {
func relProgressBar(description string) *progressbar.ProgressBar {
bar := progressbar.NewOptions(-1,
progressbar.OptionSetWidth(10),
progressbar.OptionSetRenderBlankState(true),
progressbar.OptionSetVisibility(false),
)
if isatty.IsTerminal(os.Stderr.Fd()) {
bar = progressbar.NewOptions64(-1,
progressbar.OptionSetDescription(description),
progressbar.OptionSetWriter(os.Stderr),
progressbar.OptionSetWidth(10),
progressbar.OptionThrottle(65*time.Millisecond),
progressbar.OptionShowCount(),
progressbar.OptionShowIts(),
progressbar.OptionSetItsString("relationship"),
progressbar.OptionOnCompletion(func() { fmt.Fprint(os.Stderr, "\n") }),
progressbar.OptionSpinnerType(14),
progressbar.OptionFullWidth(),
progressbar.OptionSetRenderBlankState(true),
)
}
return bar
}

func backupCreateCmdFunc(cmd *cobra.Command, args []string) (err error) {
f, err := createBackupFile(args[0])
if err != nil {
return err
}
defer func(e *error) { *e = errors.Join((*e), f.Close()) }(&err)
defer func(e *error) { *e = errors.Join((*e), f.Sync()) }(&err)

client, err := client.NewClient(cmd)
if err != nil {
Expand Down Expand Up @@ -188,18 +245,11 @@ func backupCreateCmdFunc(cmd *cobra.Command, args []string) error {
}
}

var hasProgressbar bool
var relWriter io.Writer = f
if isatty.IsTerminal(os.Stderr.Fd()) {
bar := progressbar.DefaultBytes(-1, "backing up")
relWriter = io.MultiWriter(bar, f)
hasProgressbar = true
}

encoder, err := backupformat.NewEncoder(relWriter, schema, schemaResp.ReadAt)
encoder, err := backupformat.NewEncoder(f, schema, schemaResp.ReadAt)
if err != nil {
return fmt.Errorf("error creating backup file encoder: %w", err)
}
defer func(e *error) { *e = errors.Join((*e), encoder.Close()) }(&err)

relationshipStream, err := client.BulkExportRelationships(ctx, &v1.BulkExportRelationshipsRequest{
Consistency: &v1.Consistency{
Expand All @@ -214,7 +264,8 @@ func backupCreateCmdFunc(cmd *cobra.Command, args []string) error {

relationshipReadStart := time.Now()

var processed uint
bar := relProgressBar("processing backup")
var relsEncoded, relsProcessed uint
for {
if err := ctx.Err(); err != nil {
return fmt.Errorf("aborted backup: %w", err)
Expand All @@ -229,54 +280,41 @@ func backupCreateCmdFunc(cmd *cobra.Command, args []string) error {
}

for _, rel := range relsResp.Relationships {
if !hasRelPrefix(rel, prefixFilter) {
continue
}
if hasRelPrefix(rel, prefixFilter) {
if err := encoder.Append(rel); err != nil {
return fmt.Errorf("error storing relationship: %w", err)
}
relsEncoded++

if err := encoder.Append(rel); err != nil {
return fmt.Errorf("error storing relationship: %w", err)
if relsEncoded%100_000 == 0 && !isatty.IsTerminal(os.Stderr.Fd()) {
log.Trace().
Uint("encoded", relsEncoded).
Uint("processed", relsProcessed).
Msg("backup progress")
}
}
processed++

if processed%100_000 == 0 && !hasProgressbar {
log.Trace().Uint("relationships", processed).Msg("relationships stored")
relsProcessed++
if err := bar.Add(1); err != nil {
return fmt.Errorf("error incrementing progress bar: %w", err)
}
}
}

totalTime := time.Since(relationshipReadStart)
relsPerSec := float64(processed) / totalTime.Seconds()

if err := bar.Finish(); err != nil {
return fmt.Errorf("error finalizing progress bar: %w", err)
}

log.Info().
Uint("relationships", processed).
Uint("encoded", relsEncoded).
Uint("processed", relsProcessed).
Uint64("perSecond", perSec(uint64(relsProcessed), totalTime)).
Stringer("duration", totalTime).
Float64("perSecond", relsPerSec).
Msg("finished backup")

if err := encoder.Close(); err != nil {
return fmt.Errorf("error closing backup encoder: %w", err)
}

if f != os.Stdout {
if err := f.Sync(); err != nil {
return fmt.Errorf("error syncing backup file: %w", err)
}
}

if err := f.Close(); err != nil {
return fmt.Errorf("error closing backup file: %w", err)
}

return nil
}

var backupRestoreCmd = &cobra.Command{
Use: "restore <filename>",
Short: "Restore a permission system from a file",
Args: cobra.MaximumNArgs(1),
RunE: restoreCmdFunc,
}

func openRestoreFile(filename string) (*os.File, int64, error) {
if filename == "" {
log.Trace().Str("filename", "(stdin)").Send()
Expand Down Expand Up @@ -304,23 +342,17 @@ func restoreCmdFunc(cmd *cobra.Command, args []string) error {
filename = args[0]
}

f, fSize, err := openRestoreFile(filename)
f, _, err := openRestoreFile(filename)
if err != nil {
return err
}
defer func(e *error) { *e = errors.Join((*e), f.Close()) }(&err)

var hasProgressbar bool
var restoreReader io.Reader = f
if isatty.IsTerminal(os.Stderr.Fd()) {
bar := progressbar.DefaultBytes(fSize, "restoring")
restoreReader = io.TeeReader(f, bar)
hasProgressbar = true
}

decoder, err := backupformat.NewDecoder(restoreReader)
decoder, err := backupformat.NewDecoder(f)
if err != nil {
return fmt.Errorf("error creating restore file decoder: %w", err)
}
defer func(e *error) { *e = errors.Join((*e), decoder.Close()) }(&err)

if loadedToken := decoder.ZedToken(); loadedToken != nil {
log.Debug().Str("revision", loadedToken.Token).Msg("parsed revision")
Expand Down Expand Up @@ -365,11 +397,11 @@ func restoreCmdFunc(cmd *cobra.Command, args []string) error {
}

batchSize := cobrautil.MustGetInt(cmd, "batch-size")
batchesPerTransaction := cobrautil.MustGetInt(cmd, "batches-per-transaction")
batchesPerTransaction := cobrautil.MustGetInt64(cmd, "batches-per-transaction")

batch := make([]*v1.Relationship, 0, batchSize)
var written uint64
var batchesWritten int
var written, batchesWritten int64
bar := relProgressBar("restoring from backup")
for rel, err := decoder.Next(); rel != nil && err == nil; rel, err = decoder.Next() {
if err := ctx.Err(); err != nil {
return fmt.Errorf("aborted restore: %w", err)
Expand All @@ -391,18 +423,24 @@ func restoreCmdFunc(cmd *cobra.Command, args []string) error {

// Reset the relationships in the batch
batch = batch[:0]

batchesWritten++

if batchesWritten%batchesPerTransaction == 0 {
resp, err := relationshipWriter.CloseAndRecv()
if err != nil {
return fmt.Errorf("error finalizing write of %d batches: %w", batchesPerTransaction, err)
}
if !hasProgressbar {
log.Debug().Uint64("relationships", written).Msg("relationships written")
written += int64(resp.NumLoaded)
if err := bar.Set64(written); err != nil {
return fmt.Errorf("error incrementing progress bar: %w", err)
}

if !isatty.IsTerminal(os.Stderr.Fd()) {
log.Trace().
Int64("batches", batchesWritten).
Int64("relationships", written).
Msg("restore progress")
}
written += resp.NumLoaded

relationshipWriter, err = client.BulkImportRelationships(ctx)
if err != nil {
Expand All @@ -424,34 +462,33 @@ func restoreCmdFunc(cmd *cobra.Command, args []string) error {
if err != nil {
return fmt.Errorf("error finalizing last write: %w", err)
}

written += resp.NumLoaded

batchesWritten++
written += int64(resp.NumLoaded)
if err := bar.Set64(written); err != nil {
return fmt.Errorf("error incrementing progress bar: %w", err)
}
totalTime := time.Since(relationshipWriteStart)
relsPerSec := float64(written) / totalTime.Seconds()

if err := bar.Finish(); err != nil {
return fmt.Errorf("error finalizing progress bar: %w", err)
}

log.Info().
Uint64("relationships", written).
Int64("batches", batchesWritten).
Int64("relationships", written).
Uint64("perSecond", perSec(uint64(written), totalTime)).
Stringer("duration", totalTime).
Float64("perSecond", relsPerSec).
Msg("finished restore")

if err := decoder.Close(); err != nil {
return fmt.Errorf("error closing restore encoder: %w", err)
}

if err := f.Close(); err != nil {
return fmt.Errorf("error closing restore file: %w", err)
}

return nil
}

var backupParseSchemaCmd = &cobra.Command{
Use: "parse-schema <filename>",
Short: "Extract the schema from a backup file",
Args: cobra.ExactArgs(1),
RunE: backupParseSchemaCmdFunc,
func perSec(i uint64, d time.Duration) uint64 {
secs := uint64(d.Seconds())
if secs == 0 {
return i
}
return i / secs
}

func backupParseSchemaCmdFunc(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -490,13 +527,6 @@ func backupParseSchemaCmdFunc(cmd *cobra.Command, args []string) error {
return nil
}

var backupParseRevisionCmd = &cobra.Command{
Use: "parse-revision <filename>",
Short: "Extract the revision from a backup file",
Args: cobra.ExactArgs(1),
RunE: backupParseRevisionCmdFunc,
}

func backupParseRevisionCmdFunc(_ *cobra.Command, args []string) error {
filename := "" // Default to stdin.
if len(args) > 0 {
Expand All @@ -522,13 +552,6 @@ func backupParseRevisionCmdFunc(_ *cobra.Command, args []string) error {
return nil
}

var backupParseRelsCmd = &cobra.Command{
Use: "parse-relationships <filename>",
Short: "Extract the relationships from a backup file",
Args: cobra.ExactArgs(1),
RunE: backupParseRelsCmdFunc,
}

func backupParseRelsCmdFunc(cmd *cobra.Command, args []string) error {
filename := "" // Default to stdin.
if len(args) > 0 {
Expand Down
Loading

0 comments on commit 0bb1a27

Please sign in to comment.