Skip to content

Commit

Permalink
[dbnode] Add x/encoding.Cloner with the ability to shallow clone m3db…
Browse files Browse the repository at this point in the history
… response iterators (#4271)
  • Loading branch information
prateek authored Jun 11, 2024
1 parent 75a402a commit d6f6b4d
Show file tree
Hide file tree
Showing 9 changed files with 999 additions and 7 deletions.
3 changes: 2 additions & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// mockgen rules for generating mocks for exported interfaces (reflection mode)

//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out ../../persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out ../../x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio ReaderSliceOfSlicesIterator,SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out ../../x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=../../digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out ../../storage/series/series_mock.go"
//go:generate sh -c "mockgen -package=storage $PACKAGE/src/dbnode/storage IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage -out ../../storage/lookup_mock.go"
Expand All @@ -44,5 +44,6 @@
//go:generate sh -c "mockgen -package=writes -destination=../../ts/writes/write_batch_mock.go -source=../../ts/writes/types.go"
//go:generate sh -c "mockgen -package=index -destination=../../storage/index/index_mock.go -source=../../storage/index/types.go"
//go:generate sh -c "mockgen -package=permits -destination=../../storage/limits/permits/permits_mock.go -source=../../storage/limits/permits/types.go"
//go:generate sh -c "mockgen -package=xpool -destination=../../x/xpool/xpool_mock.go -source=../../x/xpool/types.go"

package mocks
2 changes: 1 addition & 1 deletion src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ import (
)

const (
multiAddrPortStart = 9000
multiAddrPortStart = 10000
multiAddrPortEach = 5
)

Expand Down
124 changes: 124 additions & 0 deletions src/dbnode/integration/shallow_copy_single_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//go:build integration
// +build integration

//
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
"time"

"github.com/m3db/m3/src/cluster/services"
"github.com/m3db/m3/src/cluster/shard"
"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/topology"
xenc "github.com/m3db/m3/src/dbnode/x/encoding"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestShallowCopySingleSeries(t *testing.T) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
}

var (
numShards = defaultNumShards
minShard = uint32(0)
maxShard = uint32(numShards - 1)
instances = []services.ServiceInstance{
node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Available)),
node(t, 1, newClusterShardsRange(minShard, maxShard, shard.Available)),
node(t, 2, newClusterShardsRange(minShard, maxShard, shard.Available)),
}
)
nodes, closeFn, clientopts := makeMultiNodeSetup(t, numShards, true, true, instances) //nolint:govet
clientopts = clientopts.
SetWriteConsistencyLevel(topology.ConsistencyLevelAll).
SetReadConsistencyLevel(topology.ReadConsistencyLevelAll)

defer closeFn()
log := nodes[0].StorageOpts().InstrumentOptions().Logger()
for _, n := range nodes {
require.NoError(t, n.StartServer())
}

c, err := client.NewClient(clientopts)
require.NoError(t, err)
session, err := c.NewSession()
require.NoError(t, err)
defer session.Close()

now := xtime.ToUnixNano(nodes[0].DB().Options().ClockOptions().NowFn()())
start := time.Now()
log.Info("starting data write")

id := ident.StringID("foo")

for i := 0; i < 10; i++ {
ts := now.Truncate(time.Second).Add(time.Duration(i * int(time.Second)))
writeErr := session.Write(testNamespaces[0], id, ts, float64(i), xtime.Second, nil)
require.NoError(t, writeErr)
}
log.Info("test data written", zap.Duration("took", time.Since(start)))

expectedValues := []float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}

iter, err := session.Fetch(testNamespaces[0], id, now.Add(-time.Hour), now.Add(time.Hour))
require.NoError(t, err)

iterPools, err := session.IteratorPools()
require.NoError(t, err)

cloner := xenc.NewShallowCloner(iterPools)

// ensure we're able to make multiple clones and iterate them without
// affecting the ability to make further clones.
iterCopyA, err := cloner.CloneSeriesIterator(iter)
require.NoError(t, err)
require.Equal(t, expectedValues, iterToValueSlice(t, iterCopyA))

iterCopyB, err := cloner.CloneSeriesIterator(iter)
require.NoError(t, err)
require.Equal(t, expectedValues, iterToValueSlice(t, iterCopyB))

log.Info("data is readable", zap.Duration("took", time.Since(start)))
}

func iterToValueSlice(
t *testing.T,
iter encoding.Iterator,
) []float64 {
valueSlice := make([]float64, 0, 10)
for iter.Next() {
dp, _, _ := iter.Current()
valueSlice = append(valueSlice, dp.Value)
}
require.NoError(t, iter.Err())
iter.Close()
return valueSlice
}
164 changes: 164 additions & 0 deletions src/dbnode/x/encoding/shallow_cloner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) 2024 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package encoding

import (
"time"

enc "github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
xtime "github.com/m3db/m3/src/x/time"
)

// Cloner makes a clone of the given encoding type.
type Cloner interface {
CloneSeriesIterator(enc.SeriesIterator) (enc.SeriesIterator, error)
}

type shallowCloner struct {
pools enc.IteratorPools

cloneBlockReaderFn cloneBlockReaderFn
}

// NewShallowCloner returns a cloner which makes a shallow copy of the given type.
func NewShallowCloner(pools enc.IteratorPools) Cloner {
cloner := shallowCloner{
pools: pools,
}
cloner.cloneBlockReaderFn = cloner.cloneBlockReader
return cloner
}

// CloneSeriesIterator makes a shallow copy of the given series iterator.
// i.e. The returned iterator can be iterated independently of the original iterator.
// It does NOT copy the underlying tsz data, only points to the original iterator's data.
// nb:
// - The lifecycle of the returned iterator is only valid until the original iterator is valid.
// - Do NOT iterate the original iterator, as it can release resources held on to by the cloned
// iterators.
func (s shallowCloner) CloneSeriesIterator(iter enc.SeriesIterator) (enc.SeriesIterator, error) {
replicas, err := iter.Replicas()
if err != nil {
return nil, err
}

replicaCopies := s.pools.MultiReaderIteratorArray().Get(len(replicas))
replicaCopies = replicaCopies[:0]
for _, replica := range replicas {
replicaCopy, err := s.cloneMultiReaderIterator(replica)
if err != nil {
return nil, err
}
replicaCopies = append(replicaCopies, replicaCopy)
}

iterCopy := s.pools.SeriesIterator().Get()
iterCopy.Reset(enc.SeriesIteratorOptions{
ID: s.pools.ID().Clone(iter.ID()),
Tags: iter.Tags().Duplicate(),
StartInclusive: iter.Start(),
EndExclusive: iter.End(),
Replicas: replicaCopies,
})

return iterCopy, nil
}

const (
_initReadersSize = 5
)

func (s shallowCloner) cloneMultiReaderIterator(
iter enc.MultiReaderIterator,
) (enc.MultiReaderIterator, error) {
// TODO: pool these slice allocations
blockReaderCopies := make([][]xio.BlockReader, 0, _initReadersSize)

// nb: the implementation below requires iteration of the blocksIters
// underlying the MultiReaderIterator. While we make sure to reset the
// state of the iterator back to what it was originally, one important
// caveat to callout: this iteration is not-threadsafe. i.e. if copies
// are going to be made by different go-routines, the synchronization
// has to be done at a level above this.
readers := iter.Readers()
initIdx := readers.Index()

// nb: we start by assuming next=true for the first pass through the readersIterator
// as the multiReaderiterator already calls Next() on the readersIterator when
// it acquires the iter.
for next := true; next; next = readers.Next() {
currLen, currStart, currBlockSize := readers.CurrentReaders()
currentCopies := make([]xio.BlockReader, 0, currLen)
for i := 0; i < currLen; i++ {
currReader := readers.CurrentReaderAt(i)
currCopy, err := s.cloneBlockReaderFn(currReader, currStart, currBlockSize)
if err != nil {
return nil, err
}
currentCopies = append(currentCopies, currCopy)
}
blockReaderCopies = append(blockReaderCopies, currentCopies)
}

// reset the reader to the original state
readers.RewindToIndex(initIdx)

// TODO: pool this type
sliceOfSlicesIter := xio.NewReaderSliceOfSlicesFromBlockReadersIterator(blockReaderCopies)
multiReaderIterCopy := s.pools.MultiReaderIterator().Get()
multiReaderIterCopy.ResetSliceOfSlices(sliceOfSlicesIter, iter.Schema())

return multiReaderIterCopy, nil
}

type cloneBlockReaderFn func(xio.BlockReader, xtime.UnixNano, time.Duration) (xio.BlockReader, error)

func (s shallowCloner) cloneBlockReader(
reader xio.BlockReader,
start xtime.UnixNano,
blockSize time.Duration,
) (xio.BlockReader, error) {
// nb: we cannot rely on the reader.Segment.Clone() method as that performs a deep copy.
// i.e. it copies the underlying data []byte as well. The copy we provide only
// copies the wrapper constructs, it still points at the same []byte as the original.

seg, err := reader.Segment()
if err != nil {
return xio.BlockReader{}, err
}

head := s.pools.CheckedBytesWrapper().Get(seg.Head.Bytes())
tail := s.pools.CheckedBytesWrapper().Get(seg.Tail.Bytes())
checksum := seg.CalculateChecksum()
// nb: the FinalizeNone is of particular importance here. It ensures we don't assume ownership
// of the head/tail bytes. So when Close() is called, we nil out our pointers and stop there.
// (instead of returning the []byte to a pool, and so on).
segCopy := ts.NewSegment(head, tail, checksum, ts.FinalizeNone)
segReader := xio.NewSegmentReader(segCopy)

return xio.BlockReader{
SegmentReader: segReader,
Start: start,
BlockSize: blockSize,
}, nil
}
Loading

0 comments on commit d6f6b4d

Please sign in to comment.