-
Notifications
You must be signed in to change notification settings - Fork 457
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add read/write of tags to fileset to restore tags, add fs index bootstrapping #590
Conversation
Codecov Report
@@ Coverage Diff @@
## master #590 +/- ##
=========================================
+ Coverage 81.7% 82.1% +0.39%
=========================================
Files 230 231 +1
Lines 22089 22331 +242
=========================================
+ Hits 18048 18334 +286
+ Misses 3012 2954 -58
- Partials 1029 1043 +14
Continue to review full report at Codecov.
|
storage/index/allocator.go
Outdated
// NewDefaultMutableSegmentAllocator returns a default mutable segment | ||
// allocator. | ||
func NewDefaultMutableSegmentAllocator( | ||
iopts instrument.Options, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: change this to Options (instead of instrument.Options)
(we override some other defaults in the segment options in the default ctor too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
persist/fs/clone/cloner.go
Outdated
if err != nil { | ||
if err == io.EOF { | ||
break | ||
} | ||
return fmt.Errorf("unexpected error while reading data: %v", err) | ||
} | ||
|
||
var tags ident.Tags |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this snippet seems to be repeated multiple times in the diff. mind pulling into a common place? m3x/ident/testutil
maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
persist/fs/msgpack/roundtrip_test.go
Outdated
// Set the default values on the fields that did not exist in V1 | ||
// and then restore them at the end of the test - This is required | ||
// because the old decoder won't read the new fields | ||
oldEncodedTags := testIndexEntry.EncodedTags |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I did the same thing in my test, but I was dealing with integer types that would get encoded anyways. Do you want to encode data with tag data included, and then strip them from the test object for the sake of comparison? That way we know the decoder can actually skip over tags data if it is present
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, do you mean in the test above or this one? Because in this one the encoded tags should actually be there if we specify encodeLegacyV1IndexEntry: false
and encode with tags with values.
@@ -112,6 +128,12 @@ func (o *options) Validate() error { | |||
"invalid index bloom filter false positive percent, must be >= 0 and <= 1: instead %f", | |||
o.indexBloomFilterFalsePositivePercent) | |||
} | |||
if o.tagEncoderPool == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this guarding against someone doing SetTagEncoderPool(nil)? I don't think we do this check everywhere but seems reasonable
persist/fs/clone/cloner_test.go
Outdated
b1.IncRef() | ||
b2.IncRef() | ||
require.Equal(t, t1.String(), t2.String()) | ||
require.Equal(t, a1.Remaining(), a2.Remaining()) | ||
numTags, numTagsMatched := a1.Remaining(), 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a matcher for tag iters, so can make this a little shorter if you'd like:
ident.NewTagIterMatcher(ident.NewTagSliceIterator(a1)).Matches(ident.NewTagSliceIterator(a2)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah nice, will update - gracias.
@@ -380,21 +388,31 @@ func (r *reader) ReadBloomFilter() (*ManagedConcurrentBloomFilter, error) { | |||
) | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method name is a little confusing, it feels like it implies it returns the bytes for an entry when really what it does is just convert arbitrary []byte to (unowned) checked.Bytes. maybe just call it toCheckedBytes or cloneBytes or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll call it entryClonedBytes(...)
ta.
@@ -1,3 +1,23 @@ | |||
// Copyright (c) 2018 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note to future self: m3db/ci-scripts#12 ++
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, aye, all good.
@@ -65,7 +65,9 @@ func TestIndexLookupWriteRead(t *testing.T) { | |||
filePathPrefix := filepath.Join(dir, "") | |||
defer os.RemoveAll(dir) | |||
|
|||
options := NewOptions(). | |||
// NB(r): Use testDefaultOpts to avoid allocing pools each |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 this test takes forever usually, hopefully this will help
} | ||
data, ok := tagsEncoder.Data() | ||
if !ok { | ||
return errWriterEncodeTagsDataNotAccessible |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this happen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if you haven't actually encoded anything yet, it should never occur in practice.
SetInstrumentOptions(opts.InstrumentOptions()). | ||
SetDatabaseBlockOptions(opts.DatabaseBlockOptions()). | ||
SetSeriesCachePolicy(opts.SeriesCachePolicy()). | ||
SetIndexMutableSegmentAllocator(index.NewDefaultMutableSegmentAllocator(iopts)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wasn't here before right? Why is it being added now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need the ability to create segments for index bootstrap results (they include segments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As prateek mentions.
@@ -45,10 +47,30 @@ type newDataFileSetReaderFn func( | |||
opts fs.Options, | |||
) (fs.DataFileSetReader, error) | |||
|
|||
type runType int | |||
|
|||
const ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to move these out into a more shareable location? I'm thinking no because its really only the FS bootstrapper that uses almost the exact same code-path for bootstrappign data vs index right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although true, this is internal to this bootstrapper - for instance for now the peer bootstrapper will have a fundamentally different path when bootstrapping the index as it needs to call FetchMetadata on the client session, hence it won't need any concept of run type.
I don't see the need at this time to cause an abstraction that's not necessarily required and may raise the question "what is this used for", etc when others read the code.
func (s *fileSystemSource) tagsFromTagsIter( | ||
iter ident.TagIterator, | ||
) (ident.Tags, error) { | ||
tags := make(ident.Tags, 0, iter.Remaining()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code repeats all the time, do you think its worth adding a new method to the iter to just do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also we're not pooling these arrays yeah? I think I need ot do something similar for the commitlog bootstrappign stuff and wasn't sure. I thought @prateek mentioned he created a pool for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, so this instance we actually use the ident.Pool
to clone the tags however we aren't pooling the arrays. If we're at the point where we need to actually create tags from the tag iterator however we are putting this into a segment which we won't rotate out until it expires, so it's probably fine not to pool for the meantime (similar to when creating the series ID and tags, at that point we just allocate because it'll be around for a while and callers need to take refs to these things without fear of it being deallocated while they use it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I don't think pooling is necessary cause these ids/tags would be retained in memory by the series
(once we transfer ownership correctly). Longer term I think it'd be worthwhile to see if bulk allocating the slices helps perf but we can come back to that.
s.log.Errorf("unable to create index segment: %v", err) | ||
hasError = true | ||
} | ||
default: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
xlog.NewField("namespace", nsID.String()), | ||
).Infof("filesystem bootstrapper resolving block retriever") | ||
if run == bootstrapDataRunType { | ||
// NB(r): We can only need to cache shard indices and possibly shortcut |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like one of the few big code blocks that would be easy to factor out into a method since it doesn't modify any other function state except the blockRetriever (which it could return).
Also, its ok that the blockRetriever is always nil in the index case yeah?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is a little confusing as well.
Maybe something like: "We only need to cache shard indices and marks blocks as fulfilled when bootstrapping data, because the data can be retrieved lazily from disk during reads. On the other hand, if we're bootstrapping the index then we need to rebuild it from scratch by reading all the IDs/tags"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair, I can refactor into a method and update the comment.
s.RLock() | ||
defer s.RUnlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete cache all metadata :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It must die, indeed.
Size uint32 | ||
Checksum uint32 | ||
Offset int64 | ||
EncodedTags []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mind updating the docs/diagrams with this change too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
persist/fs/retriever.go
Outdated
@@ -523,6 +537,10 @@ func (req *retrieveRequest) onCallerOrRetrieverDone() { | |||
} | |||
req.id.Finalize() | |||
req.id = nil | |||
if req.tags != nil { | |||
req.tags.Close() | |||
req.tags = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be req.tags = ident.EmptyTagIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing (we initialize this with ident.EmptyTagIterator when we reset for reuse but we can do that here too).
s.RLock() | ||
defer s.RUnlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the disk fetch (earlier comment) no longer a concern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think he removed it because it only applied to cache all metadata and we're planning on deleting that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah we're going to delete the cache all metadata strategy, also it's probably not all that bad to just wait if that's the case (it never should though though as this is a flush, the data should never have come from disk if we're flushing this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough
Also do you think its worth adding a simple integration test to make sure this all works end to end? Seems like you could just copy-paste one of our existing FS integration tests and add an additional assertion on the tags |
Sure thing will add an integration test. |
│- Major Version │ │- Index Entry Offset ├──┘ │- Checksum │ | ||
┌─────────────────────┐ | ||
┌─────────────────────┐ ┌─────────────────────┐ │ Index File │ | ||
│ Info File │ │ Summaries File │ │ (sorted by ID) │ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you're updating this, can you add the new fields I added recently:
type IndexInfo struct {
MajorVersion int64
BlockStart int64
BlockSize int64
Entries int64
Summaries IndexSummariesInfo
BloomFilter IndexBloomFilterInfo
SnapshotTime int64
FileType persist.FileSetType
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -0,0 +1,249 @@ | |||
// +build integration | |||
|
|||
// Copyright (c) 2016 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2018
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ta
@@ -0,0 +1,126 @@ | |||
// +build integration | |||
|
|||
// Copyright (c) 2016 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2018
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, ta.
persist/fs/msgpack/encoder.go
Outdated
// backwards-compatbility | ||
func (enc *Encoder) encodeIndexEntryV1(entry schema.IndexEntry) { | ||
// Manually encode num fields for testing purposes | ||
enc.encodeArrayLenFn(minNumIndexEntryFields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if you should just hard-code the number of use a v1-specific constant here instead of relying on the minimum (which I think is designed for decoding purposes). Seems like a safe bet since this function is designed to simulate the behavior of old binaries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
persist/fs/msgpack/encoder.go
Outdated
} | ||
|
||
func (enc *Encoder) encodeIndexEntryV2(entry schema.IndexEntry) { | ||
// Manually encode num fields for testing purposes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment seems less relevant for the "current" version
persist/fs/read.go
Outdated
|
||
return ident.BinaryID(idClone) | ||
func (r *reader) entryClonedEncodedTagsTagIter(encodedTags []byte) ident.TagIterator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: you can probably make this entryClonedEncodedTagsIter
so it doesn't stutter as much
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
"testing" | ||
"time" | ||
|
||
"github.com/m3db/m3ninx/idx" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: import order
require.NoError(t, err) | ||
defer iter.Finalize() | ||
|
||
verifyQueryMetadataResults(t, iter, exhausitive, verifyQueryMetadataResultsOptions{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 nice and clean
@@ -265,28 +317,51 @@ func (s *fileSystemSource) handleErrorsAndUnfulfilled( | |||
} | |||
} | |||
} | |||
resultLock.Unlock() | |||
// NB(r): We explicitly do not remove entries from the index results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
// as they are additive and get merged together with results from other | ||
// bootstrappers by just appending the result (unlike data bootstrap | ||
// results that when merged replace the block with the current block). | ||
// It would also be difficult to remove only series that was added to the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
were*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ta
idxopts namespace.IndexOptions, | ||
opts Options, | ||
) (segment.MutableSegment, error) { | ||
blockStart := t.Truncate(idxopts.BlockSize()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a quick note about why this truncation is required and the % ==0
guarantee the code relies upon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
runResult.Lock() | ||
exists, err = segment.ContainsID(idBytes) | ||
// ID and tags no longer required below | ||
release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like FromMetricIter clones the ID so you could probably do:
d, err := convert.FromMetricIter(id, tagsIter)
release()
if err != nil {
return err
}
runResult.Lock()
exists, err = segment.ContainsID(d.ID)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with nits
No description provided.