Skip to content

Commit

Permalink
adjust
Browse files Browse the repository at this point in the history
  • Loading branch information
zhijian-pro committed Jan 17, 2025
1 parent 8a9ddbd commit 9d166fb
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 43 deletions.
12 changes: 6 additions & 6 deletions pkg/sync/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type Stat struct {
}

func updateStats(r *Stat) {
Copied.IncrInt64(r.Copied)
CopiedBytes.IncrInt64(r.CopiedBytes)
copied.IncrInt64(r.Copied)
copiedBytes.IncrInt64(r.CopiedBytes)
if checked != nil {
checked.IncrInt64(r.Checked)
checkedBytes.IncrInt64(r.CheckedBytes)
Expand Down Expand Up @@ -96,8 +96,8 @@ func sendStats(addr string) {
var r Stat
r.Skipped = skipped.Current()
r.SkippedBytes = skippedBytes.Current()
r.Copied = Copied.Current()
r.CopiedBytes = CopiedBytes.Current()
r.Copied = copied.Current()
r.CopiedBytes = copiedBytes.Current()
if checked != nil {
r.Checked = checked.Current()
r.CheckedBytes = checkedBytes.Current()
Expand All @@ -119,8 +119,8 @@ func sendStats(addr string) {
} else {
skipped.IncrInt64(-r.Skipped)
skippedBytes.IncrInt64(-r.SkippedBytes)
Copied.IncrInt64(-r.Copied)
CopiedBytes.IncrInt64(-r.CopiedBytes)
copied.IncrInt64(-r.Copied)
copiedBytes.IncrInt64(-r.CopiedBytes)
if checked != nil {
checked.IncrInt64(-r.Checked)
checkedBytes.IncrInt64(-r.CheckedBytes)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sync/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func (r *parallelDownloader) Read(b []byte) (int, error) {
r.Unlock()
<-r.concurrent
}
if CopiedBytes != nil {
CopiedBytes.IncrInt64(int64(n))
if copiedBytes != nil {
copiedBytes.IncrInt64(int64(n))
}
return n, nil
}
Expand Down
59 changes: 33 additions & 26 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ const (
var (
handled *utils.Bar
pending *utils.Bar
Copied, CopiedBytes *utils.Bar
copied, copiedBytes *utils.Bar
checked, checkedBytes *utils.Bar
skipped, skippedBytes *utils.Bar
deleted, failed *utils.Bar
Concurrent chan int
concurrent chan int
limiter *ratelimit.Bucket
)
var crcTable = crc32.MakeTable(crc32.Castagnoli)
Expand Down Expand Up @@ -259,9 +259,9 @@ func calPartChksum(objStor object.ObjectStorage, key string, abort chan struct{}
select {
case <-abort:
return 0, fmt.Errorf("aborted")
case Concurrent <- 1:
case concurrent <- 1:
defer func() {
<-Concurrent
<-concurrent
}()
}
in, err := objStor.Get(key, offset, length)
Expand Down Expand Up @@ -330,9 +330,9 @@ func compObjPartBinary(src, dst object.ObjectStorage, key string, abort chan str
select {
case <-abort:
return fmt.Errorf("aborted")
case Concurrent <- 1:
case concurrent <- 1:
defer func() {
<-Concurrent
<-concurrent
}()
}
in, err := src.Get(key, offset, length)
Expand Down Expand Up @@ -468,7 +468,7 @@ func doCopySingle(src, dst object.ObjectStorage, key string, size int64, calChks
if size > maxBlock && !inMap(dst, readInMem) && !inMap(src, fastStreamRead) {
var err error
var in io.Reader
downer := newParallelDownloader(src, key, size, downloadBufSize, Concurrent)
downer := newParallelDownloader(src, key, size, downloadBufSize, concurrent)
defer downer.Close()
if inMap(dst, streamWrite) {
in = downer
Expand Down Expand Up @@ -496,7 +496,7 @@ func doCopySingle(src, dst object.ObjectStorage, key string, size int64, calChks
if err != nil {
if _, e := src.Head(key); os.IsNotExist(e) {
logger.Debugf("Head src %s: %s", key, err)
Copied.IncrInt64(-1)
copied.IncrInt64(-1)
err = nil
}
}
Expand All @@ -506,9 +506,9 @@ func doCopySingle(src, dst object.ObjectStorage, key string, size int64, calChks
}

func doCopySingle0(src, dst object.ObjectStorage, key string, size int64, calChksum bool) (uint32, error) {
Concurrent <- 1
concurrent <- 1
defer func() {
<-Concurrent
<-concurrent
}()
var in io.ReadCloser
var err error
Expand All @@ -527,7 +527,7 @@ func doCopySingle0(src, dst object.ObjectStorage, key string, size int64, calChk
if err != nil {
if _, e := src.Head(key); os.IsNotExist(e) {
logger.Debugf("Head src %s: %s", key, err)
Copied.IncrInt64(-1)
copied.IncrInt64(-1)
err = nil
}
return 0, err
Expand All @@ -548,7 +548,7 @@ func (w *withProgress) Read(b []byte) (int, error) {
limiter.Wait(int64(len(b)))
}
n, err := w.r.Read(b)
CopiedBytes.IncrInt64(int64(n))
copiedBytes.IncrInt64(int64(n))
return n, err
}

Expand Down Expand Up @@ -583,7 +583,7 @@ func doUploadPart(src, dst object.ObjectStorage, srckey string, off, size int64,
return nil, 0, fmt.Errorf("part %d: %s", num, err)
}
logger.Debugf("Copied data of %s part %d in %s", key, num, time.Since(start))
CopiedBytes.IncrInt64(sz)
copiedBytes.IncrInt64(sz)
return part, chksum, nil
}

Expand All @@ -603,9 +603,9 @@ func doCopyRange(src, dst object.ObjectStorage, key string, off, size int64, upl
select {
case <-abort:
return nil, 0, fmt.Errorf("aborted")
case Concurrent <- 1:
case concurrent <- 1:
defer func() {
<-Concurrent
<-concurrent
}()
}

Expand Down Expand Up @@ -726,6 +726,13 @@ func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, uploa
return chksum, nil
}

func InitForCopyData() {
concurrent = make(chan int, 10)
progress := utils.NewProgress(true)
copied = progress.AddCountSpinner("Copied objects")
copiedBytes = progress.AddByteSpinner("Copied bytes")
}

func CopyData(src, dst object.ObjectStorage, key string, size int64, calChksum bool) (uint32, error) {
start := time.Now()
var err error
Expand Down Expand Up @@ -775,7 +782,7 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C
} else {
copyPerms(dst, obj, config)
}
Copied.Increment()
copied.Increment()
case markChecksum:
if config.Dry {
logger.Debugf("Will compare checksum for %s", key)
Expand All @@ -793,7 +800,7 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C
if o, e := dst.Head(key); e == nil {
if needCopyPerms(obj, o) {
copyPerms(dst, obj, config)
Copied.Increment()
copied.Increment()
} else {
skipped.Increment()
skippedBytes.IncrInt64(obj.Size())
Expand All @@ -813,8 +820,8 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C
default:
if config.Dry {
logger.Debugf("Will copy %s (%d bytes)", obj.Key(), obj.Size())
Copied.Increment()
CopiedBytes.IncrInt64(obj.Size())
copied.Increment()
copiedBytes.IncrInt64(obj.Size())
break
}
var err error
Expand Down Expand Up @@ -842,7 +849,7 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C
if config.Perms {
copyPerms(dst, obj, config)
}
Copied.Increment()
copied.Increment()
} else if errors.Is(err, utils.ErrSkipped) {
skipped.Increment()
} else {
Expand Down Expand Up @@ -1415,7 +1422,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {
}
tasks := make(chan object.Object, bufferSize)
wg := sync.WaitGroup{}
Concurrent = make(chan int, config.Threads)
concurrent = make(chan int, config.Threads)
if config.BWLimit > 0 {
bps := float64(config.BWLimit*1e6/8) * 0.85 // 15% overhead
limiter = ratelimit.NewBucketWithRate(bps, int64(bps)*3)
Expand All @@ -1426,8 +1433,8 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {
skipped = progress.AddCountSpinner("Skipped objects")
skippedBytes = progress.AddByteSpinner("Skipped bytes")
pending = progress.AddCountSpinner("Pending objects")
Copied = progress.AddCountSpinner("Copied objects")
CopiedBytes = progress.AddByteSpinner("Copied bytes")
copied = progress.AddCountSpinner("Copied objects")
copiedBytes = progress.AddByteSpinner("Copied bytes")
if config.CheckAll || config.CheckNew {
checked = progress.AddCountSpinner("Checked objects")
checkedBytes = progress.AddByteSpinner("Checked bytes")
Expand All @@ -1443,7 +1450,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {

if config.Manager == "" {
msg := fmt.Sprintf("Found: %d, skipped: %d (%s), copied: %d (%s)",
total, skipped.Current(), formatSize(skippedBytes.Current()), Copied.Current(), formatSize(CopiedBytes.Current()))
total, skipped.Current(), formatSize(skippedBytes.Current()), copied.Current(), formatSize(copiedBytes.Current()))
if checked != nil {
msg += fmt.Sprintf(", checked: %d (%s)", checked.Current(), formatSize(checkedBytes.Current()))
}
Expand Down Expand Up @@ -1564,13 +1571,13 @@ func initSyncMetrics(config *Config) {
Name: "copied",
Help: "Copied objects",
}, func() float64 {
return float64(Copied.Current())
return float64(copied.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "copied_bytes",
Help: "Copied bytes",
}, func() float64 {
return float64(CopiedBytes.Current())
return float64(copiedBytes.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "skipped",
Expand Down
11 changes: 2 additions & 9 deletions pkg/vfs/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ func backup(m meta.Meta, blob object.ObjectStorage, now time.Time, fast, skipTra
if err != nil {
return "", err
}
defer func() {
if err == nil {
_ = os.Remove(fp.Name())
}
}()
defer os.Remove(fp.Name())
defer fp.Close()
zw, _ := gzip.NewWriterLevel(fp, gzip.BestSpeed)
var threads = 2
Expand All @@ -139,10 +135,7 @@ func backup(m meta.Meta, blob object.ObjectStorage, now time.Time, fast, skipTra
if err != nil {
return "", err
}
osync.Concurrent = make(chan int, 10)
progress := utils.NewProgress(true)
osync.Copied = progress.AddCountSpinner("Copied objects")
osync.CopiedBytes = progress.AddByteSpinner("Copied bytes")
osync.InitForCopyData()
_, err = osync.CopyData(disk, blob, fpath, size, true)
return blob.String() + fpath, err
}
Expand Down

0 comments on commit 9d166fb

Please sign in to comment.