diff --git a/pkg/meta/base.go b/pkg/meta/base.go index e8231e30754a..c67b2fdc7b78 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -104,7 +104,7 @@ type engine interface { doWrite(ctx Context, inode Ino, indx uint32, off uint32, slice Slice, mtime time.Time, numSlices *int, delta *dirStat, attr *Attr) syscall.Errno doTruncate(ctx Context, inode Ino, flags uint8, length uint64, delta *dirStat, attr *Attr, skipPermCheck bool) syscall.Errno doFallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64, delta *dirStat, attr *Attr) syscall.Errno - doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*slice, skipped int, pos uint32, id uint64, size uint32, delayed []byte) syscall.Errno + doCompactChunk(inode Ino, indx uint32, origin []byte, dslices, wslices []*slice, skipped int, delayed []byte) syscall.Errno doGetParents(ctx Context, inode Ino) map[Ino]int doUpdateDirStat(ctx Context, batch map[Ino]dirStat) error @@ -2049,28 +2049,28 @@ func (m *baseMeta) compactChunk(inode Ino, indx uint32, once, force bool) { if len(ss) > maxCompactSlices { ss = ss[:maxCompactSlices] } - skipped := skipSome(ss) + skipped, tail := skipSome(ss) + ss = ss[:tail] compacted := ss[skipped:] - pos, size, slices := compactChunk(compacted) - if len(compacted) < 2 || size == 0 { + if len(compacted) < 2 { return } - for _, s := range ss[:skipped] { - if pos+size > s.pos && s.pos+s.len > pos { - var sstring string - for _, s := range ss { - sstring += fmt.Sprintf("\n%+v", *s) - } - panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring)) - } - } var id uint64 if st = m.NewSlice(Background, &id); st != 0 { return } - logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size) - err := m.newMsg(CompactChunk, slices, id) + pos, headSize, size, tailSize, rslices := compactChunk(compacted) + logger.Infof("compact %d slices into %d, %d, %d %+v", len(ss), headSize, size, tailSize, rslices) + if len(rslices) < 1 { + return + } + var deleted uint32 + for _, s := range compacted { + deleted += s.size + } + logger.Debugf("compact %d:%d: skipped %d slices, will compact %d slices (%d bytes), write %d bytes", inode, indx, skipped, len(compacted), deleted, size) + err := m.newMsg(CompactChunk, rslices, id) if err != nil { if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") { logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err) @@ -2092,7 +2092,15 @@ func (m *baseMeta) compactChunk(inode Ino, indx uint32, once, force bool) { for _, s := range ss { origin = append(origin, marshalSlice(s.pos, s.id, s.size, s.off, s.len)...) } - st = m.en.doCompactChunk(inode, indx, origin, compacted, skipped, pos, id, size, dsbuf) + var wslices []*slice + if headSize > 0 { + wslices = append(wslices, &slice{pos: pos, len: headSize}) + } + wslices = append(wslices, &slice{id: id, size: size, pos: pos + headSize, len: size}) + if tailSize > 0 { + wslices = append(wslices, &slice{pos: pos + headSize + size, len: tailSize}) + } + st = m.en.doCompactChunk(inode, indx, origin, compacted, wslices, skipped, dsbuf) if st == syscall.EINVAL { logger.Infof("compaction for %d:%d is wasted, delete slice %d (%d bytes)", inode, indx, id, size) m.deleteSlice(id, size) @@ -2102,7 +2110,7 @@ func (m *baseMeta) compactChunk(inode Ino, indx uint32, once, force bool) { logger.Warnf("compact %d %d: %s", inode, indx, err) } - if force { + if force && (skipped > 0 || len(ss) > maxCompactSlices/2) { m.Lock() delete(m.compacting, k) m.Unlock() diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index 9c58c1d9aa89..49390f56488e 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -1541,15 +1541,15 @@ func testCompaction(t *testing.T, m Meta, trash bool) { _ = m.Write(ctx, inode, 0, uint32(0), Slice{Id: sliceId, Size: 1 << 20, Len: 64 << 10}, time.Now()) m.NewSlice(ctx, &sliceId) _ = m.Write(ctx, inode, 0, uint32(128<<10), Slice{Id: sliceId, Size: 2 << 20, Len: 128 << 10}, time.Now()) - _ = m.Write(ctx, inode, 0, uint32(0), Slice{Id: 0, Size: 1 << 20, Len: 1 << 20}, time.Now()) + _ = m.Fallocate(ctx, inode, fallocZeroRange, 0, 1<<20, nil) if c, ok := m.(compactor); ok { c.compactChunk(inode, 0, false, true) } if st := m.Read(ctx, inode, 0, &slices); st != 0 { t.Fatalf("read 0: %s", st) } - if len(slices) != 1 || slices[0].Len != 3<<20 { - t.Fatalf("inode %d should be compacted, but have %d slices, size %d", inode, len(slices), slices[0].Len) + if len(slices) != 2 || slices[0].Len != 1048576 || slices[1].Len != 2097152 { + t.Fatalf("inode %d should be compacted, but have %d slices: %+v", inode, len(slices), slices) } m.NewSlice(ctx, &sliceId) @@ -1566,10 +1566,24 @@ func testCompaction(t *testing.T, m Meta, trash bool) { if st := m.Read(ctx, inode, 2, &slices); st != 0 { t.Fatalf("read 1: %s", st) } - // compact twice: 4515328+2607724-2338508 = 4784544; 8829056+1074933-2338508-4784544=2780937 - if len(slices) != 3 || slices[0].Len != 2338508 || slices[1].Len != 4784544 || slices[2].Len != 2780937 { - t.Fatalf("inode %d should be compacted, but have %d slices, size %d,%d,%d", - inode, len(slices), slices[0].Len, slices[1].Len, slices[2].Len) + // 8829056 - 2338508 = 6490548 + if len(slices) != 3 || slices[0].Len != 2338508 || slices[1].Len != 6490548 || slices[2].Len != 1074933 { + t.Fatalf("inode %d should be compacted, but have %d slices: %+v", inode, len(slices), slices) + } + + m.NewSlice(ctx, &sliceId) + _ = m.Write(ctx, inode, 3, 0, Slice{Id: sliceId, Size: 2338508, Len: 2338508}, time.Now()) + _ = m.CopyFileRange(ctx, inode, 3*ChunkSize, inode, 4*ChunkSize, 2338508, 0, nil, nil) + _ = m.Fallocate(ctx, inode, fallocZeroRange, 4*ChunkSize, ChunkSize, nil) + _ = m.CopyFileRange(ctx, inode, 3*ChunkSize, inode, 4*ChunkSize, 2338508, 0, nil, nil) + if c, ok := m.(compactor); ok { + c.compactChunk(inode, 4, false, true) + } + if st := m.Read(ctx, inode, 4, &slices); st != 0 { + t.Fatalf("read inode %d chunk 4: %s", inode, st) + } + if len(slices) != 1 || slices[0].Len != 2338508 { + t.Fatalf("inode %d should be compacted, but have %d slices, size %d", inode, len(slices), slices[0].Len) } } diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index c875eb97ef69..21c3448caba2 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2837,10 +2837,18 @@ func (r *redisMeta) doCleanupDelayedSlices(edge int64) (int, error) { return count, err } -func (m *redisMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*slice, skipped int, pos uint32, id uint64, size uint32, delayed []byte) syscall.Errno { +func (m *redisMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, dslices, wslices []*slice, skipped int, delayed []byte) syscall.Errno { + var id uint64 + var size uint32 + for _, s := range wslices { + if s.id > 0 { + id, size = s.id, s.size + break + } + } var rs []*redis.IntCmd // trash disabled: check reference of slices if delayed == nil { - rs = make([]*redis.IntCmd, len(ss)) + rs = make([]*redis.IntCmd, len(dslices)) } key := m.chunkKey(inode, indx) ctx := Background @@ -2861,7 +2869,10 @@ func (m *redisMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []* _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { pipe.LTrim(ctx, key, int64(n), -1) - pipe.LPush(ctx, key, marshalSlice(pos, id, size, 0, size)) + for i := len(wslices); i > 0; i-- { + s := wslices[i-1] + pipe.LPush(ctx, key, marshalSlice(s.pos, s.id, s.size, s.off, s.len)) + } for i := skipped; i > 0; i-- { pipe.LPush(ctx, key, origin[(i-1)*sliceBytes:i*sliceBytes]) } @@ -2871,7 +2882,7 @@ func (m *redisMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []* pipe.HSet(ctx, m.delSlices(), fmt.Sprintf("%d_%d", id, time.Now().Unix()), delayed) } } else { - for i, s := range ss { + for i, s := range dslices { if s.id > 0 { rs[i] = pipe.HIncrBy(ctx, m.sliceRefs(), m.sliceKey(s.id, s.size), -1) } @@ -2896,7 +2907,7 @@ func (m *redisMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []* } else if st == 0 { m.cleanupZeroRef(m.sliceKey(id, size)) if delayed == nil { - for i, s := range ss { + for i, s := range dslices { if s.id > 0 && rs[i].Err() == nil && rs[i].Val() < 0 { m.deleteSlice(s.id, s.size) } diff --git a/pkg/meta/slice.go b/pkg/meta/slice.go index 081276cf6e8c..9c8ddf594dd3 100644 --- a/pkg/meta/slice.go +++ b/pkg/meta/slice.go @@ -154,51 +154,72 @@ func buildSlice(ss []*slice) []Slice { return chunk } -func compactChunk(ss []*slice) (uint32, uint32, []Slice) { - var chunk = buildSlice(ss) - var pos uint32 - n := len(chunk) - for n > 1 { - if chunk[0].Id == 0 { - pos += chunk[0].Len - chunk = chunk[1:] - n-- - } else if chunk[n-1].Id == 0 { - chunk = chunk[:n-1] - n-- - } else { - break +func compactChunk(slices []*slice) (uint32, uint32, uint32, uint32, []Slice) { + var pos uint32 = ChunkSize + for _, s := range slices { + if s.pos < pos { + pos = s.pos + if pos == 0 { + break + } } } - if n == 1 && chunk[0].Id == 0 { - chunk[0].Len = 1 + ss := buildSlice(slices) + if pos > 0 && len(ss) > 0 { + // remove left padding + ss = ss[1:] + } + + var head, tail uint32 + trimmed := ss + for len(trimmed) > 0 && trimmed[0].Id == 0 { + head += trimmed[0].Len + trimmed = trimmed[1:] + } + for n := len(trimmed); n > 0 && trimmed[n-1].Id == 0; n-- { + tail += trimmed[n-1].Len + trimmed = trimmed[:n-1] + } + if len(trimmed) == 0 { + tail = head - 1 + head = 0 + trimmed = []Slice{{Len: 1}} } var size uint32 - for _, c := range chunk { + for _, c := range trimmed { size += c.Len } - return pos, size, chunk + return pos, head, size, tail, trimmed } -func skipSome(chunk []*slice) int { - var skipped int - var total = len(chunk) - for skipped < total { - ss := chunk[skipped:] - pos, size, c := compactChunk(ss) - first := ss[0] - if first.len < (1<<20) || first.len*5 < size || size == 0 { - // it's too small +func shouldSkip(s *slice, rest []*slice, lastWrite uint32) (bool, uint32) { + pos, head, write, tail, _ := compactChunk(rest) + if pos < s.pos+s.len && s.pos < pos+head+write+tail { + return false, 0 // overlap + } + reduced := lastWrite - write + if write == 0 || reduced*5 < lastWrite || reduced < 2<<20 || reduced < s.size { + return false, 0 + } + return true, write +} + +func skipSome(slices []*slice) (int, int) { + var head, tail int + _, _, lastWrite, _, _ := compactChunk(slices) + for ; head < len(slices)-1; head++ { + skip, write := shouldSkip(slices[head], slices[head+1:], lastWrite) + if !skip { break } - isFirst := func(pos uint32, s Slice) bool { - return pos == first.pos && s.Id == first.id && s.Off == first.off && s.Len == first.len - } - if !isFirst(pos, c[0]) { - // it's not the first slice, compact it + lastWrite = write + } + for tail = len(slices); tail > head+1; tail-- { + skip, write := shouldSkip(slices[tail-1], slices[:tail-1], lastWrite) + if !skip { break } - skipped++ + lastWrite = write } - return skipped + return head, tail } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 9803051fe54d..80a68df75f89 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -2753,7 +2753,17 @@ func (m *dbMeta) doCleanupDelayedSlices(edge int64) (int, error) { return count, nil } -func (m *dbMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*slice, skipped int, pos uint32, id uint64, size uint32, delayed []byte) syscall.Errno { +func (m *dbMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, dslices, wslices []*slice, skipped int, delayed []byte) syscall.Errno { + var id uint64 + var size uint32 + var wbuf []byte + for _, s := range wslices { + wbuf = append(wbuf, marshalSlice(s.pos, s.id, s.size, s.off, s.len)...) + if s.id > 0 { + id, size = s.id, s.size + break + } + } st := errno(m.txn(func(s *xorm.Session) error { var c2 = chunk{Inode: inode, Indx: indx} _, err := s.ForUpdate().MustCols("indx").Get(&c2) @@ -2765,7 +2775,7 @@ func (m *dbMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*sli return syscall.EINVAL } - c2.Slices = append(append(c2.Slices[:skipped*sliceBytes], marshalSlice(pos, id, size, 0, size)...), c2.Slices[len(origin):]...) + c2.Slices = append(append(c2.Slices[:skipped*sliceBytes], wbuf...), c2.Slices[len(origin):]...) if _, err := s.Where("Inode = ? AND indx = ?", inode, indx).Update(c2); err != nil { return err } @@ -2780,7 +2790,7 @@ func (m *dbMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*sli } } } else { - for _, s_ := range ss { + for _, s_ := range dslices { if s_.id == 0 { continue } @@ -2813,7 +2823,7 @@ func (m *dbMeta) doCompactChunk(inode Ino, indx uint32, origin []byte, ss []*sli return mustInsert(s, &sliceRef{id, size, 0}) }) } else if st == 0 && delayed == nil { - for _, s := range ss { + for _, s := range dslices { if s.id == 0 { continue } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 357bcfd1e97f..15c43043f3e9 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2317,7 +2317,17 @@ func (m *kvMeta) doCleanupDelayedSlices(edge int64) (int, error) { return count, nil } -func (m *kvMeta) doCompactChunk(inode Ino, indx uint32, buf []byte, ss []*slice, skipped int, pos uint32, id uint64, size uint32, delayed []byte) syscall.Errno { +func (m *kvMeta) doCompactChunk(inode Ino, indx uint32, buf []byte, dslices, wslices []*slice, skipped int, delayed []byte) syscall.Errno { + var id uint64 + var size uint32 + var wbuf []byte + for _, s := range wslices { + wbuf = append(wbuf, marshalSlice(s.pos, s.id, s.size, s.off, s.len)...) + if s.id > 0 { + id, size = s.id, s.size + break + } + } st := errno(m.txn(func(tx *kvTxn) error { buf2 := tx.get(m.chunkKey(inode, indx)) if len(buf2) < len(buf) || !bytes.Equal(buf, buf2[:len(buf)]) { @@ -2325,7 +2335,7 @@ func (m *kvMeta) doCompactChunk(inode Ino, indx uint32, buf []byte, ss []*slice, return syscall.EINVAL } - buf2 = append(append(buf2[:skipped*sliceBytes], marshalSlice(pos, id, size, 0, size)...), buf2[len(buf):]...) + buf2 = append(append(buf2[:skipped*sliceBytes], wbuf...), buf2[len(buf):]...) tx.set(m.chunkKey(inode, indx), buf2) // create the key to tracking it tx.set(m.sliceKey(id, size), make([]byte, 8)) @@ -2334,7 +2344,7 @@ func (m *kvMeta) doCompactChunk(inode Ino, indx uint32, buf []byte, ss []*slice, tx.set(m.delSliceKey(time.Now().Unix(), id), delayed) } } else { - for _, s := range ss { + for _, s := range dslices { if s.id > 0 { tx.incrBy(m.sliceKey(s.id, s.size), -1) } @@ -2364,7 +2374,7 @@ func (m *kvMeta) doCompactChunk(inode Ino, indx uint32, buf []byte, ss []*slice, m.cleanupZeroRef(id, size) if delayed == nil { var refs int64 - for _, s := range ss { + for _, s := range dslices { if s.id > 0 && m.client.txn(func(tx *kvTxn) error { refs = tx.incrBy(m.sliceKey(s.id, s.size), 0) return nil