Skip to content

Commit

Permalink
mirror:Add the printing of event time (#4953)
Browse files Browse the repository at this point in the history
* Add the printing of event time

* Update mirror-main.go

---------

Co-authored-by: Klaus Post <[email protected]>
  • Loading branch information
dormanze and klauspost authored Sep 16, 2024
1 parent 41aca25 commit 37095da
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions cmd/mirror-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"sync"
"time"

"github.com/dustin/go-humanize"

"github.com/fatih/color"
"github.com/minio/cli"
json "github.com/minio/colorjson"
Expand Down Expand Up @@ -277,17 +279,31 @@ type mirrorJob struct {

// mirrorMessage container for file mirror messages
type mirrorMessage struct {
Status string `json:"status"`
Source string `json:"source"`
Target string `json:"target"`
Size int64 `json:"size"`
TotalCount int64 `json:"totalCount"`
TotalSize int64 `json:"totalSize"`
Status string `json:"status"`
Source string `json:"source"`
Target string `json:"target"`
Size int64 `json:"size"`
TotalCount int64 `json:"totalCount"`
TotalSize int64 `json:"totalSize"`
EventTime string `json:"eventTime"`
EventType notification.EventType `json:"eventType"`
}

// String colorized mirror message
func (m mirrorMessage) String() string {
return console.Colorize("Mirror", fmt.Sprintf("`%s` -> `%s`", m.Source, m.Target))
var msg string
if m.EventTime != "" {
msg = console.Colorize("Time", fmt.Sprintf("[%s] ", m.EventTime))
}
if m.EventType == notification.ObjectRemovedDelete {
return msg + "Removed " + console.Colorize("Removed", fmt.Sprintf("`%s`", m.Target))
}
if m.EventTime == "" {
return console.Colorize("Mirror", fmt.Sprintf("`%s` -> `%s`", m.Source, m.Target))
}
msg += console.Colorize("Size", fmt.Sprintf("%6s ", humanize.IBytes(uint64(m.Size))))
msg += console.Colorize("Mirror", fmt.Sprintf("`%s` -> `%s`", m.Source, m.Target))
return msg
}

// JSON jsonified mirror message
Expand Down Expand Up @@ -345,7 +361,7 @@ func (mj *mirrorJob) doDeleteBucket(ctx context.Context, sURLs URLs) URLs {
}

// doRemove - removes files on target.
func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs) URLs {
func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs, event EventInfo) URLs {
if mj.opts.isFake {
return sURLs.WithError(nil)
}
Expand Down Expand Up @@ -375,13 +391,21 @@ func (mj *mirrorJob) doRemove(ctx context.Context, sURLs URLs) URLs {
}
return sURLs.WithError(result.Err)
}
targetPath := filepath.ToSlash(filepath.Join(sURLs.TargetAlias, sURLs.TargetContent.URL.Path))
mj.status.PrintMsg(mirrorMessage{
Target: targetPath,
TotalCount: sURLs.TotalCount,
TotalSize: sURLs.TotalSize,
EventTime: event.Time,
EventType: event.Type,
})
}

return sURLs.WithError(nil)
}

// doMirror - Mirror an object to multiple destination. URLs status contains a copy of sURLs and error if any.
func (mj *mirrorJob) doMirrorWatch(ctx context.Context, targetPath string, tgtSSE encrypt.ServerSide, sURLs URLs) URLs {
func (mj *mirrorJob) doMirrorWatch(ctx context.Context, targetPath string, tgtSSE encrypt.ServerSide, sURLs URLs, event EventInfo) URLs {
shouldQueue := false
if !mj.opts.isOverwrite && !mj.opts.activeActive {
targetClient, err := newClient(targetPath)
Expand All @@ -405,7 +429,7 @@ func (mj *mirrorJob) doMirrorWatch(ctx context.Context, targetPath string, tgtSS
mj.status.AddCounts(1)
sURLs.TotalSize = mj.status.Get()
sURLs.TotalCount = mj.status.GetCounts()
return mj.doMirror(ctx, sURLs)
return mj.doMirror(ctx, sURLs, event)
}
return sURLs.WithError(probe.NewError(ObjectAlreadyExists{}))
}
Expand All @@ -428,7 +452,7 @@ func convertSizeToTag(size int64) string {
}

// doMirror - Mirror an object to multiple destination. URLs status contains a copy of sURLs and error if any.
func (mj *mirrorJob) doMirror(ctx context.Context, sURLs URLs) URLs {
func (mj *mirrorJob) doMirror(ctx context.Context, sURLs URLs, event EventInfo) URLs {
if sURLs.Error != nil { // Erroneous sURLs passed.
return sURLs.WithError(sURLs.Error.Trace())
}
Expand Down Expand Up @@ -481,6 +505,8 @@ func (mj *mirrorJob) doMirror(ctx context.Context, sURLs URLs) URLs {
Size: length,
TotalCount: sURLs.TotalCount,
TotalSize: sURLs.TotalSize,
EventTime: event.Time,
EventType: event.Type,
})
}
sURLs.MD5 = mj.opts.md5
Expand Down Expand Up @@ -589,10 +615,6 @@ func (mj *mirrorJob) monitorMirrorStatus(cancel context.CancelFunc) (errDuringMi

if sURLs.SourceContent != nil {
mirrorTotalUploadedBytes.Add(float64(sURLs.SourceContent.Size))
} else if sURLs.TargetContent != nil {
// Construct user facing message and path.
targetPath := filepath.ToSlash(filepath.Join(sURLs.TargetAlias, sURLs.TargetContent.URL.Path))
mj.status.PrintMsg(rmMessage{Key: targetPath})
}
}

Expand Down Expand Up @@ -687,7 +709,7 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)
continue
}
mj.parallel.queueTask(func() URLs {
return mj.doMirrorWatch(ctx, targetPath, tgtSSE, mirrorURL)
return mj.doMirrorWatch(ctx, targetPath, tgtSSE, mirrorURL, event)
}, mirrorURL.SourceContent.Size)
} else if event.Type == notification.ObjectRemovedDelete {
if targetAlias != "" && strings.Contains(event.UserAgent, uaMirrorAppName+":"+targetAlias) {
Expand All @@ -707,7 +729,7 @@ func (mj *mirrorJob) watchMirrorEvents(ctx context.Context, events []EventInfo)
mirrorURL.TotalSize = mj.status.Get()
if mirrorURL.TargetContent != nil && (mj.opts.isRemove || mj.opts.activeActive) {
mj.parallel.queueTask(func() URLs {
return mj.doRemove(ctx, mirrorURL)
return mj.doRemove(ctx, mirrorURL, event)
}, 0)
}
} else if event.Type == notification.BucketCreatedAll {
Expand Down Expand Up @@ -807,11 +829,11 @@ func (mj *mirrorJob) startMirror(ctx context.Context) {

if sURLs.SourceContent != nil {
mj.parallel.queueTask(func() URLs {
return mj.doMirror(ctx, sURLs)
return mj.doMirror(ctx, sURLs, EventInfo{})
}, sURLs.SourceContent.Size)
} else if sURLs.TargetContent != nil && mj.opts.isRemove {
mj.parallel.queueTask(func() URLs {
return mj.doRemove(ctx, sURLs)
return mj.doRemove(ctx, sURLs, EventInfo{})
}, 0)
}
case <-ctx.Done():
Expand Down

0 comments on commit 37095da

Please sign in to comment.