From 501c20d3da11e486d3e2bab32ff2dd2a6f13363b Mon Sep 17 00:00:00 2001 From: Prateek Rungta Date: Mon, 8 Apr 2019 13:28:10 -0400 Subject: [PATCH] [dbnode] Aggregate() using only FSTs where possible --- src/dbnode/generated-source-files.mk | 17 +- src/dbnode/storage/index.go | 147 +++++++++--- src/dbnode/storage/index/aggregate_results.go | 44 ++++ .../aggregate_results_entry_arraypool_gen.go | 127 ++++++++++ src/dbnode/storage/index/block.go | 223 +++++++++++++++++- src/dbnode/storage/index/block_prop_test.go | 7 +- .../index/field_terms_iterator_prop_test.go | 127 ++++++++++ .../index/field_terms_iterator_test.go | 172 ++++++++++++++ .../storage/index/fields_terms_iterator.go | 201 ++++++++++++++++ src/dbnode/storage/index/index_mock.go | 44 ++++ src/dbnode/storage/index/options.go | 28 +++ src/dbnode/storage/index/types.go | 35 ++- src/dbnode/storage/index_block_test.go | 107 ++++++++- src/m3ninx/generated-source-files.mk | 9 +- .../index/segment/fst/fst_terms_iterator.go | 3 +- .../fst/fst_terms_postings_iterator.go | 5 +- src/m3ninx/search/proptest/query_gen.go | 2 - 17 files changed, 1239 insertions(+), 59 deletions(-) create mode 100644 src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go create mode 100644 src/dbnode/storage/index/field_terms_iterator_prop_test.go create mode 100644 src/dbnode/storage/index/field_terms_iterator_test.go create mode 100644 src/dbnode/storage/index/fields_terms_iterator.go diff --git a/src/dbnode/generated-source-files.mk b/src/dbnode/generated-source-files.mk index af9e4778c4..be579cbc04 100644 --- a/src/dbnode/generated-source-files.mk +++ b/src/dbnode/generated-source-files.mk @@ -172,7 +172,9 @@ genny-map-storage-index-aggregation-results: genny-map-storage-index-aggregate-v # generation rule for all generated arraypools .PHONY: genny-arraypool-all -genny-arraypool-all: genny-arraypool-node-segments +genny-arraypool-all: \ + genny-arraypool-node-segments \ + genny-arraypool-aggregate-results-entry \ # arraypool generation rule for ./network/server/tchannelthrift/node/segmentsArrayPool .PHONY: genny-arraypool-node-segments @@ -186,6 +188,19 @@ genny-arraypool-node-segments: rename_type_middle=Segments \ rename_constructor=newSegmentsArrayPool +# arraypool generation rule for ./storage/index/AggregateResultsEntryArrayPool +.PHONY: genny-arraypool-aggregate-results-entry +genny-arraypool-aggregate-results-entry: + cd $(m3x_package_path) && make genny-arraypool \ + pkg=index \ + elem_type=AggregateResultsEntry \ + target_package=$(m3db_package)/src/dbnode/storage/index \ + out_file=aggregate_results_entry_arraypool_gen.go \ + rename_type_prefix=AggregateResultsEntry \ + rename_type_middle=AggregateResultsEntry \ + rename_constructor=NewAggregateResultsEntryArrayPool \ + rename_gen_types=true \ + # generation rule for all generated leakcheckpools .PHONY: genny-leakcheckpool-all genny-leakcheckpool-all: \ diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index c703ca47b6..04db37d4da 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -41,16 +41,17 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/idx" m3ninxindex "github.com/m3db/m3/src/m3ninx/index" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/index/segment/builder" - "github.com/m3db/m3/src/x/resource" xclose "github.com/m3db/m3/src/x/close" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xlog "github.com/m3db/m3/src/x/log" + "github.com/m3db/m3/src/x/resource" xsync "github.com/m3db/m3/src/x/sync" xtime "github.com/m3db/m3/src/x/time" @@ -167,6 +168,23 @@ type newNamespaceIndexOpts struct { newBlockFn newBlockFn } +// execBlockQueryFn executes a query against the given block whilst tracking state. +type execBlockQueryFn func( + cancellable *resource.CancellableLifetime, + block index.Block, + query index.Query, + opts index.QueryOptions, + state *asyncQueryExecState, + results index.BaseResults, +) + +// asyncQueryExecState tracks the async execution errors and results for a query. +type asyncQueryExecState struct { + sync.Mutex + multiErr xerrors.MultiError + exhaustive bool +} + // newNamespaceIndex returns a new namespaceIndex for the provided namespace. func newNamespaceIndex( nsMD namespace.Metadata, @@ -265,6 +283,7 @@ func newNamespaceIndexWithOptions( queryWorkersPool: newIndexOpts.opts.QueryIDsWorkerPool(), metrics: newNamespaceIndexMetrics(indexOpts, instrumentOpts), } + if runtimeOptsMgr != nil { idx.runtimeOptsListener = runtimeOptsMgr.RegisterListener(idx) } @@ -868,7 +887,7 @@ func (i *nsIndex) Query( results.Reset(i.nsMetadata.ID(), index.QueryResultsOptions{ SizeLimit: opts.Limit, }) - exhaustive, err := i.query(ctx, query, results, opts) + exhaustive, err := i.query(ctx, query, results, opts, i.execBlockQueryFn) if err != nil { return index.QueryResult{}, err } @@ -890,7 +909,12 @@ func (i *nsIndex) AggregateQuery( TermFilter: opts.TermFilter, Type: opts.Type, }) - exhaustive, err := i.query(ctx, query, results, opts.QueryOptions) + // use appropriate fn to query underlying blocks. + fn := i.execBlockQueryFn + if query.Equal(idx.NewAllQuery()) { + fn = i.execBlockAggregateQueryFn + } + exhaustive, err := i.query(ctx, query, results, opts.QueryOptions, fn) if err != nil { return index.AggregateQueryResult{}, err } @@ -905,6 +929,7 @@ func (i *nsIndex) query( query index.Query, results index.BaseResults, opts index.QueryOptions, + execBlockFn execBlockQueryFn, ) (bool, error) { // Capture start before needing to acquire lock. start := i.nowFn() @@ -940,17 +965,12 @@ func (i *nsIndex) query( } var ( - deadline = start.Add(timeout) - wg sync.WaitGroup - // State contains concurrent mutable state for async execution below. - state = struct { - sync.Mutex - multiErr xerrors.MultiError - exhaustive bool - }{ + state = asyncQueryExecState{ exhaustive: true, } + deadline = start.Add(timeout) + wg sync.WaitGroup ) // Create a cancellable lifetime and cancel it at end of this method so that @@ -958,32 +978,6 @@ func (i *nsIndex) query( cancellable := resource.NewCancellableLifetime() defer cancellable.Cancel() - execBlockQuery := func(block index.Block) { - blockExhaustive, err := block.Query(cancellable, query, opts, results) - if err == index.ErrUnableToQueryBlockClosed { - // NB(r): Because we query this block outside of the results lock, it's - // possible this block may get closed if it slides out of retention, in - // that case those results are no longer considered valid and outside of - // retention regardless, so this is a non-issue. - err = nil - } - - state.Lock() - defer state.Unlock() - - if err != nil { - state.multiErr = state.multiErr.Add(err) - return - } - - if blockExhaustive { - return - } - - // If block had more data but we stopped early, need to notify caller. - state.exhaustive = false - } - for _, block := range blocks { // Capture block for async query execution below. block := block @@ -1009,7 +1003,7 @@ func (i *nsIndex) query( // No timeout, just wait blockingly for a worker. wg.Add(1) i.queryWorkersPool.Go(func() { - execBlockQuery(block) + execBlockFn(cancellable, block, query, opts, &state, results) wg.Done() }) continue @@ -1020,7 +1014,7 @@ func (i *nsIndex) query( if timeLeft := deadline.Sub(i.nowFn()); timeLeft > 0 { wg.Add(1) timedOut := !i.queryWorkersPool.GoWithTimeout(func() { - execBlockQuery(block) + execBlockFn(cancellable, block, query, opts, &state, results) wg.Done() }, timeLeft) @@ -1085,6 +1079,81 @@ func (i *nsIndex) query( return exhaustive, nil } +func (i *nsIndex) execBlockQueryFn( + cancellable *resource.CancellableLifetime, + block index.Block, + query index.Query, + opts index.QueryOptions, + state *asyncQueryExecState, + results index.BaseResults, +) { + blockExhaustive, err := block.Query(cancellable, query, opts, results) + if err == index.ErrUnableToQueryBlockClosed { + // NB(r): Because we query this block outside of the results lock, it's + // possible this block may get closed if it slides out of retention, in + // that case those results are no longer considered valid and outside of + // retention regardless, so this is a non-issue. + err = nil + } + + state.Lock() + defer state.Unlock() + + if err != nil { + state.multiErr = state.multiErr.Add(err) + return + } + + if blockExhaustive { + return + } + + // If block had more data but we stopped early, need to notify caller. + state.exhaustive = false +} + +func (i *nsIndex) execBlockAggregateQueryFn( + cancellable *resource.CancellableLifetime, + block index.Block, + query index.Query, + opts index.QueryOptions, + state *asyncQueryExecState, + results index.BaseResults, +) { + aggResults, ok := results.(index.AggregateResults) + if !ok { // should never happen + state.Lock() + state.multiErr = state.multiErr.Add( + fmt.Errorf("unknown results type [%T] received during aggregation", results)) + state.Unlock() + return + } + + blockExhaustive, err := block.Aggregate(cancellable, opts, aggResults) + if err == index.ErrUnableToQueryBlockClosed { + // NB(r): Because we query this block outside of the results lock, it's + // possible this block may get closed if it slides out of retention, in + // that case those results are no longer considered valid and outside of + // retention regardless, so this is a non-issue. + err = nil + } + + state.Lock() + defer state.Unlock() + + if err != nil { + state.multiErr = state.multiErr.Add(err) + return + } + + if blockExhaustive { + return + } + + // If block had more data but we stopped early, need to notify caller. + state.exhaustive = false +} + func (i *nsIndex) timeoutForQueryWithRLock( ctx context.Context, ) time.Duration { diff --git a/src/dbnode/storage/index/aggregate_results.go b/src/dbnode/storage/index/aggregate_results.go index a35661f470..7c6b4e6fcc 100644 --- a/src/dbnode/storage/index/aggregate_results.go +++ b/src/dbnode/storage/index/aggregate_results.go @@ -104,6 +104,50 @@ func (r *aggregatedResults) AddDocuments(batch []doc.Document) (int, error) { return size, err } +func (r *aggregatedResults) AggregateResultsOptions() AggregateResultsOptions { + return r.aggregateOpts +} + +func (r *aggregatedResults) AddFields(batch []AggregateResultsEntry) (int, error) { + r.Lock() + for _, entry := range batch { + f := entry.Field + aggValues, ok := r.resultsMap.Get(f) + if !ok { + aggValues = r.valuesPool.Get() + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + r.resultsMap.set(f, aggValues, _AggregateResultsMapKeyOptions{ + copyKey: false, + finalizeKey: true, + }) + } else { + // because we already have a entry for this field, we release the ident back to + // the underlying pool. + f.Finalize() + } + valuesMap := aggValues.Map() + for _, t := range entry.Terms { + _, ok := valuesMap.Get(t) + if !ok { + // we can avoid the copy because we assume ownership of the passed ident.ID, + // but still need to finalize it. + valuesMap.set(t, struct{}{}, _AggregateValuesMapKeyOptions{ + copyKey: false, + finalizeKey: true, + }) + } else { + // because we already have a entry for this field, we release the ident back to + // the underlying pool. + t.Finalize() + } + } + } + size := r.resultsMap.Len() + r.Unlock() + return size, nil +} + func (r *aggregatedResults) addDocumentsBatchWithLock( batch []doc.Document, ) error { diff --git a/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go b/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go new file mode 100644 index 0000000000..66ac84180c --- /dev/null +++ b/src/dbnode/storage/index/aggregate_results_entry_arraypool_gen.go @@ -0,0 +1,127 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// This file was automatically generated by genny. +// Any changes will be lost if this file is regenerated. +// see https://github.com/mauricelam/genny + +package index + +import ( + "github.com/m3db/m3/src/x/pool" +) + +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// AggregateResultsEntryArrayPool provides a pool for aggregateResultsEntry slices. +type AggregateResultsEntryArrayPool interface { + // Init initializes the array pool, it needs to be called + // before Get/Put use. + Init() + + // Get returns the a slice from the pool. + Get() []AggregateResultsEntry + + // Put returns the provided slice to the pool. + Put(elems []AggregateResultsEntry) +} + +type AggregateResultsEntryFinalizeFn func([]AggregateResultsEntry) []AggregateResultsEntry + +type AggregateResultsEntryArrayPoolOpts struct { + Options pool.ObjectPoolOptions + Capacity int + MaxCapacity int + FinalizeFn AggregateResultsEntryFinalizeFn +} + +type AggregateResultsEntryArrPool struct { + opts AggregateResultsEntryArrayPoolOpts + pool pool.ObjectPool +} + +func NewAggregateResultsEntryArrayPool(opts AggregateResultsEntryArrayPoolOpts) AggregateResultsEntryArrayPool { + if opts.FinalizeFn == nil { + opts.FinalizeFn = defaultAggregateResultsEntryFinalizerFn + } + p := pool.NewObjectPool(opts.Options) + return &AggregateResultsEntryArrPool{opts, p} +} + +func (p *AggregateResultsEntryArrPool) Init() { + p.pool.Init(func() interface{} { + return make([]AggregateResultsEntry, 0, p.opts.Capacity) + }) +} + +func (p *AggregateResultsEntryArrPool) Get() []AggregateResultsEntry { + return p.pool.Get().([]AggregateResultsEntry) +} + +func (p *AggregateResultsEntryArrPool) Put(arr []AggregateResultsEntry) { + arr = p.opts.FinalizeFn(arr) + if max := p.opts.MaxCapacity; max > 0 && cap(arr) > max { + return + } + p.pool.Put(arr) +} + +func defaultAggregateResultsEntryFinalizerFn(elems []AggregateResultsEntry) []AggregateResultsEntry { + var empty AggregateResultsEntry + for i := range elems { + elems[i] = empty + } + elems = elems[:0] + return elems +} + +type AggregateResultsEntryArr []AggregateResultsEntry + +func (elems AggregateResultsEntryArr) grow(n int) []AggregateResultsEntry { + if cap(elems) < n { + elems = make([]AggregateResultsEntry, n) + } + elems = elems[:n] + // following compiler optimized memcpy impl + // https://github.com/golang/go/wiki/CompilerOptimizations#optimized-memclr + var empty AggregateResultsEntry + for i := range elems { + elems[i] = empty + } + return elems +} diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 4d8c2e4c76..a43c905bc3 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -21,6 +21,7 @@ package index import ( + "bytes" "errors" "fmt" "sync" @@ -37,11 +38,13 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/m3ninx/search/executor" - "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" xlog "github.com/m3db/m3/src/x/log" + "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" "github.com/uber-go/tally" @@ -75,7 +78,8 @@ const ( blockStateSealed blockStateClosed - defaultQueryDocsBatchSize = 256 + defaultQueryDocsBatchSize = 256 + defaultAggregateResultsEntryBatchSize = 256 compactDebugLogEvery = 1 // Emit debug log for every compaction ) @@ -114,6 +118,9 @@ type block struct { iopts instrument.Options nsMD namespace.Metadata docsPool doc.DocumentArrayPool + aggEntryPool AggregateResultsEntryArrayPool + idPool ident.Pool + bytesPool pool.CheckedBytesPool compactingForeground bool compactingBackground bool @@ -221,6 +228,9 @@ func NewBlock( iopts: iopts, nsMD: md, docsPool: docsPool, + aggEntryPool: indexOpts.AggregateResultsEntryArrayPool(), + idPool: indexOpts.IdentifierPool(), + bytesPool: indexOpts.CheckedBytesPool(), foregroundCompactor: foregroundCompactor, backgroundCompactor: backgroundCompactor, metrics: newBlockMetrics(iopts.MetricsScope()), @@ -768,6 +778,31 @@ func (b *block) executorWithRLock() (search.Executor, error) { return executor.NewExecutor(readers), nil } +func (b *block) segmentsWithRLock() ([]segment.Segment, error) { + expectedSegments := len(b.foregroundSegments) + len(b.backgroundSegments) + for _, group := range b.shardRangesSegments { + expectedSegments += len(group.segments) + } + + segments := make([]segment.Segment, 0, expectedSegments) + // Add foreground & background segments. + for _, seg := range b.foregroundSegments { + segments = append(segments, seg.Segment()) + } + for _, seg := range b.backgroundSegments { + segments = append(segments, seg.Segment()) + } + + // Loop over the segments associated to shard time ranges. + for _, group := range b.shardRangesSegments { + for _, seg := range group.segments { + segments = append(segments, seg) + } + } + + return segments, nil +} + // Query acquires a read lock on the block so that the segments // are guaranteed to not be freed/released while accumulating results. // This allows references to the mmap'd segment data to be accumulated @@ -884,6 +919,190 @@ func (b *block) addQueryResults( return batch, size, err } +// Aggregate acquires a read lock on the block so that the segments +// are guaranteed to not be freed/released while accumulating results. +// This allows references to the mmap'd segment data to be accumulated +// and then copied into the results before this method returns (it is not +// safe to return docs directly from the segments from this method, the +// results datastructure is used to copy it every time documents are added +// to the results datastructure). This is similar to how Query() operates. +// NB: Aggregate is required in addition to Query, to optimise for the case +// when we can skip going to raw documents, and instead rely on pre-aggregated +// results via the FST underlying the index. +func (b *block) Aggregate( + cancellable *resource.CancellableLifetime, + opts QueryOptions, + results AggregateResults, +) (bool, error) { + b.RLock() + defer b.RUnlock() + + if b.state == blockStateClosed { + return false, ErrUnableToQueryBlockClosed + } + + segs, err := b.segmentsWithRLock() + if err != nil { + return false, err + } + + size := results.Size() + batch := b.aggEntryPool.Get() + batchSize := cap(batch) + if batchSize == 0 { + batchSize = defaultAggregateResultsEntryBatchSize + } + + aggOpts := results.AggregateResultsOptions() + iterateTerms := aggOpts.Type == AggregateTagNamesAndValues + defer func() { + b.aggEntryPool.Put(batch) + }() + + iter, err := newFieldsAndTermsIterator(nil, fieldsAndTermsIteratorOpts{}) + if err != nil { + return false, err + } + + for _, s := range segs { + if opts.LimitExceeded(size) { + break + } + + err = iter.Reset(s, fieldsAndTermsIteratorOpts{ + iterateTerms: iterateTerms, + allowFn: func(field []byte) bool { + // skip any field names that we shouldn't allow. + if bytes.Equal(field, doc.IDReservedFieldName) { + return false + } + return aggOpts.TermFilter.Allow(field) + }, + }) + if err != nil { + return false, err + } + + for iter.Next() { + if opts.LimitExceeded(size) { + break + } + + field, term := iter.Current() + batch = b.appendAggregateResults(batch, field, term, iterateTerms) + if len(batch) < batchSize { + continue + } + + batch, size, err = b.addAggregateResults(cancellable, results, batch) + if err != nil { + iter.Close() + return false, err + } + } + + if err := iter.Err(); err != nil { + iter.Close() + return false, err + } + + if err := iter.Close(); err != nil { + return false, err + } + } + + if err := iter.Close(); err != nil { + return false, err + } + + // Add last batch to results if remaining. + if len(batch) > 0 { + batch, size, err = b.addAggregateResults(cancellable, results, batch) + if err != nil { + return false, err + } + } + + exhaustive := !opts.LimitExceeded(size) + return exhaustive, nil +} + +func (b *block) appendAggregateResults( + batch []AggregateResultsEntry, + field, term []byte, + includeTerms bool, +) []AggregateResultsEntry { + // NB(prateek): we make a copy of the (field, term) entries returned + // by the iterator during traversal, because the []byte are only valid per entry during + // the traversal (i.e. calling Next() invalidates the []byte). We choose to do this + // instead of checking if the entry is required (duplicates may exist in the results map + // already), as it reduces contention on the map itself. Further, the ownership of these + // idents is transferred to the results map, which either hangs on to them (if they are new), + // or finalizes them if they are duplicates. + var ( + entry AggregateResultsEntry + lastField []byte + lastFieldIsValid bool + ) + // relying on the fact that iterator traversal is in order, we can avoid creating duplicate + // entries for the same fields, by checking the last batch entry to see if the bytes are + // the same. + if len(batch) > 0 { + lastFieldIsValid = true + lastField = batch[len(batch)-1].Field.Bytes() + } + if lastFieldIsValid && bytes.Equal(lastField, field) { + entry = batch[len(batch)-1] // avoid alloc cause we already have the field + } else { + entry.Field = b.pooledID(field) // allocate id because this is the first time we've seen it + } + + if includeTerms { + // terms are always new (as far we know without checking the map for duplicates), so we allocate + entry.Terms = append(entry.Terms, b.pooledID(term)) + } + + batch = append(batch, entry) + return batch +} + +func (b *block) pooledID(id []byte) ident.ID { + data := b.bytesPool.Get(len(id)) + data.IncRef() + data.AppendAll(id) + data.DecRef() + return b.idPool.BinaryID(data) +} + +func (b *block) addAggregateResults( + cancellable *resource.CancellableLifetime, + results AggregateResults, + batch []AggregateResultsEntry, +) ([]AggregateResultsEntry, int, error) { + // Checkout the lifetime of the query before adding results + queryValid := cancellable.TryCheckout() + if !queryValid { + // Query not valid any longer, do not add results and return early + return batch, 0, errCancelledQuery + } + + // Try to add the docs to the resource + size, err := results.AddFields(batch) + + // Immediately release the checkout on the lifetime of query + cancellable.ReleaseCheckout() + + // Reset batch + var emptyField AggregateResultsEntry + for i := range batch { + batch[i] = emptyField + } + batch = batch[:0] + + // Return results + return batch, size, err +} + func (b *block) AddResults( results result.IndexBlock, ) error { diff --git a/src/dbnode/storage/index/block_prop_test.go b/src/dbnode/storage/index/block_prop_test.go index 2887ebd39d..12e2b248aa 100644 --- a/src/dbnode/storage/index/block_prop_test.go +++ b/src/dbnode/storage/index/block_prop_test.go @@ -37,9 +37,8 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/search" "github.com/m3db/m3/src/m3ninx/search/proptest" - "github.com/m3db/m3/src/m3ninx/util" - "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/resource" "github.com/leanovate/gopter" "github.com/leanovate/gopter/prop" @@ -47,9 +46,7 @@ import ( ) var ( - testFstOptions = fst.NewOptions() - testBlockSize = time.Hour - lotsTestDocuments = util.MustReadDocs("../../../m3ninx/util/testdata/node_exporter.json", 2000) + testBlockSize = time.Hour ) // TestPostingsListCacheDoesNotAffectBlockQueryResults verifies that the postings list diff --git a/src/dbnode/storage/index/field_terms_iterator_prop_test.go b/src/dbnode/storage/index/field_terms_iterator_prop_test.go new file mode 100644 index 0000000000..39b9b2109a --- /dev/null +++ b/src/dbnode/storage/index/field_terms_iterator_prop_test.go @@ -0,0 +1,127 @@ +// +build big + +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "math/rand" + "os" + "testing" + "time" + + "github.com/leanovate/gopter" + "github.com/leanovate/gopter/gen" + "github.com/leanovate/gopter/prop" +) + +func TestFieldsTermsIteratorPropertyTest(t *testing.T) { + parameters := gopter.DefaultTestParameters() + seed := time.Now().UnixNano() + parameters.MinSuccessfulTests = 100 + parameters.MaxSize = 40 + parameters.Rng = rand.New(rand.NewSource(seed)) + properties := gopter.NewProperties(parameters) + + properties.Property("Fields Terms Iteration doesn't blow up", prop.ForAll( + func(i fieldsTermsIteratorPropInput) (bool, error) { + expected := i.expected() + seg := i.setup.asSegment(t) + iter, err := newFieldsAndTermsIterator(seg, fieldsAndTermsIteratorOpts{ + iterateTerms: i.iterateTerms, + allowFn: i.allowFn, + }) + if err != nil { + return false, err + } + observed := toSlice(t, iter) + requireSlicesEqual(t, expected, observed) + return true, nil + }, + genFieldsTermsIteratorPropInput(), + )) + + reporter := gopter.NewFormatedReporter(true, 160, os.Stdout) + if !properties.Run(reporter) { + t.Errorf("failed with initial seed: %d", seed) + } +} + +type fieldsTermsIteratorPropInput struct { + setup fieldsTermsIterSetup + iterateTerms bool + allowFn allowFn +} + +func (i fieldsTermsIteratorPropInput) expected() []pair { + fields := i.setup.fields + expected := make([]pair, 0, len(fields)) + seen := make(map[string]bool, len(fields)) + for _, f := range fields { + if !i.allowFn([]byte(f.Name)) { + continue + } + if seen[f.Name] { + continue + } + seen[f.Name] = true + if !i.iterateTerms { + f.Value = "" + } + expected = append(expected, f) + } + return expected +} + +func genFieldsTermsIteratorPropInput() gopter.Gen { + return genFieldsTermsIteratorSetup(). + Map(func(s fieldsTermsIterSetup, params *gopter.GenParameters) fieldsTermsIteratorPropInput { + allowedFields := make(map[string]bool, len(s.fields)) + for _, f := range s.fields { + if params.NextBool() { + allowedFields[f.Name] = true + } + } + return fieldsTermsIteratorPropInput{ + setup: s, + iterateTerms: params.NextBool(), + allowFn: func(f []byte) bool { + return allowedFields[string(f)] + }, + } + }) +} + +func genFieldsTermsIteratorSetup() gopter.Gen { + return gen.SliceOf( + gen.Identifier()). + SuchThat(func(items []string) bool { + return len(items)%2 == 0 && len(items) > 0 + }). + Map(func(items []string) fieldsTermsIterSetup { + pairs := make([]pair, 0, len(items)/2) + for i := 0; i < len(items); i += 2 { + name, value := items[i], items[i+1] + pairs = append(pairs, pair{name, value}) + } + return newFieldsTermsIterSetup(pairs...) + }) +} diff --git a/src/dbnode/storage/index/field_terms_iterator_test.go b/src/dbnode/storage/index/field_terms_iterator_test.go new file mode 100644 index 0000000000..c5dc6abe1e --- /dev/null +++ b/src/dbnode/storage/index/field_terms_iterator_test.go @@ -0,0 +1,172 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "bytes" + "fmt" + "sort" + "strings" + "testing" + + "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment" + "github.com/m3db/m3/src/m3ninx/index/segment/fst" + "github.com/m3db/m3/src/m3ninx/util" + + "github.com/stretchr/testify/require" +) + +var ( + testFstOptions = fst.NewOptions() + lotsTestDocuments = util.MustReadDocs("../../../m3ninx/util/testdata/node_exporter.json", 2000) +) + +func TestFieldsTermsIteratorSimple(t *testing.T) { + s := newFieldsTermsIterSetup( + pair{"a", "b"}, pair{"a", "c"}, + pair{"d", "e"}, pair{"d", "f"}, + pair{"g", "h"}, pair{"i", "j"}, + pair{"k", "l"}, + ) + seg := s.asSegment(t) + + iter, err := newFieldsAndTermsIterator(seg, fieldsAndTermsIteratorOpts{iterateTerms: true}) + require.NoError(t, err) + s.requireEquals(t, iter) +} + +func TestFieldsTermsIteratorSimpleSkip(t *testing.T) { + input := []pair{ + pair{"a", "b"}, pair{"a", "c"}, + pair{"d", "e"}, pair{"d", "f"}, + pair{"g", "h"}, pair{"i", "j"}, + pair{"k", "l"}, + } + s := newFieldsTermsIterSetup(input...) + seg := s.asSegment(t) + + iter, err := newFieldsAndTermsIterator(seg, fieldsAndTermsIteratorOpts{ + iterateTerms: true, + allowFn: func(f []byte) bool { + return !bytes.Equal([]byte("a"), f) && !bytes.Equal([]byte("k"), f) + }, + }) + require.NoError(t, err) + slice := toSlice(t, iter) + requireSlicesEqual(t, []pair{ + pair{"d", "e"}, pair{"d", "f"}, + pair{"g", "h"}, pair{"i", "j"}, + }, slice) +} + +func TestFieldsTermsIteratorTermsOnly(t *testing.T) { + s := newFieldsTermsIterSetup( + pair{"a", "b"}, pair{"a", "c"}, + pair{"d", "e"}, pair{"d", "f"}, + pair{"g", "h"}, pair{"i", "j"}, + pair{"k", "l"}, + ) + seg := s.asSegment(t) + + iter, err := newFieldsAndTermsIterator(seg, fieldsAndTermsIteratorOpts{}) + require.NoError(t, err) + slice := toSlice(t, iter) + requireSlicesEqual(t, []pair{ + pair{"a", ""}, pair{"d", ""}, pair{"g", ""}, pair{"i", ""}, pair{"k", ""}, + }, slice) +} + +type pair struct { + Name, Value string +} + +func newFieldsTermsIterSetup(fields ...pair) fieldsTermsIterSetup { + sort.Slice(fields, func(i, j int) bool { + c := strings.Compare(fields[i].Name, fields[j].Name) + if c == 0 { + return strings.Compare(fields[i].Value, fields[j].Value) < 0 + } + return c < 0 + }) + return fieldsTermsIterSetup{fields} +} + +type fieldsTermsIterSetup struct { + fields []pair +} + +func (s *fieldsTermsIterSetup) asSegment(t *testing.T) segment.Segment { + docs := make([]doc.Document, 0, len(s.fields)) + for _, f := range s.fields { + docs = append(docs, doc.Document{ + ID: []byte(fmt.Sprintf("id_%v_%v", f.Name, f.Value)), + Fields: []doc.Field{ + doc.Field{ + Name: []byte(f.Name), + Value: []byte(f.Value), + }, + }, + }) + } + memSeg := testSegment(t, docs...).(segment.MutableSegment) + return fst.ToTestSegment(t, memSeg, testFstOptions) +} + +func (s *fieldsTermsIterSetup) requireEquals(t *testing.T, iter fieldsAndTermsIterator) { + pending := s.fields + for len(pending) > 0 { + require.True(t, iter.Next()) + name, value := iter.Current() + if bytes.Equal(name, doc.IDReservedFieldName) { + continue + } + top := pending[0] + pending = pending[1:] + require.Equal(t, top.Name, string(name)) + require.Equal(t, top.Value, string(value)) + } + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) +} + +func toSlice(t *testing.T, iter fieldsAndTermsIterator) []pair { + var pairs []pair + for iter.Next() { + n, v := iter.Current() + if bytes.Equal(n, doc.IDReservedFieldName) { + continue + } + pairs = append(pairs, pair{ + Name: string(n), + Value: string(v), + }) + } + return pairs +} + +func requireSlicesEqual(t *testing.T, a, b []pair) { + require.Equal(t, len(a), len(b)) + for i := 0; i < len(a); i++ { + require.Equal(t, a[i], b[i]) + } +} diff --git a/src/dbnode/storage/index/fields_terms_iterator.go b/src/dbnode/storage/index/fields_terms_iterator.go new file mode 100644 index 0000000000..152b1ea90f --- /dev/null +++ b/src/dbnode/storage/index/fields_terms_iterator.go @@ -0,0 +1,201 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "github.com/m3db/m3/src/m3ninx/index/segment" + xerrors "github.com/m3db/m3/src/x/errors" +) + +// fieldsAndTermsIterator iterates over all known fields and terms for a segment. +type fieldsAndTermsIterator interface { + // Next returns a bool indicating if there are any more elements. + Next() bool + + // Current returns the current element. + // NB: the element returned is only valid until the subsequent call to Next(). + Current() (field, term []byte) + + // Err returns any errors encountered during iteration. + Err() error + + // Close releases any resources held by the iterator. + Close() error + + // Reset resets the iterator to the start iterating the given segment. + Reset(seg segment.Segment, opts fieldsAndTermsIteratorOpts) error +} + +type fieldsAndTermsIteratorOpts struct { + iterateTerms bool + allowFn allowFn +} + +func (o fieldsAndTermsIteratorOpts) allow(f []byte) bool { + if o.allowFn == nil { + return true + } + return o.allowFn(f) +} + +type allowFn func(field []byte) bool + +type fieldsAndTermsIter struct { + seg segment.Segment + opts fieldsAndTermsIteratorOpts + + done bool + err error + fieldIter segment.FieldsIterator + termIter segment.TermsIterator + + current struct { + valid bool + field []byte + term []byte + } +} + +var ( + fieldsAndTermsIterZeroed fieldsAndTermsIter +) + +var _ fieldsAndTermsIterator = &fieldsAndTermsIter{} + +func newFieldsAndTermsIterator(s segment.Segment, opts fieldsAndTermsIteratorOpts) (fieldsAndTermsIterator, error) { + iter := &fieldsAndTermsIter{} + err := iter.Reset(s, opts) + if err != nil { + return nil, err + } + return iter, nil +} + +func (fti *fieldsAndTermsIter) Reset(s segment.Segment, opts fieldsAndTermsIteratorOpts) error { + *fti = fieldsAndTermsIterZeroed + fti.seg = s + fti.opts = opts + if s == nil { + return nil + } + fiter, err := s.FieldsIterable().Fields() + if err != nil { + return err + } + fti.fieldIter = fiter + return nil +} + +func (fti *fieldsAndTermsIter) setNextField() bool { + if fti.fieldIter == nil { + return false + } + + for fti.fieldIter.Next() { + field := fti.fieldIter.Current() + if !fti.opts.allow(field) { + continue + } + fti.current.valid = true + fti.current.field = fti.fieldIter.Current() + return true + } + + fti.current.valid = false + fti.err = fti.fieldIter.Err() + return false +} + +func (fti *fieldsAndTermsIter) setNext() bool { + // if only need to iterate fields + if !fti.opts.iterateTerms { + return fti.setNextField() + } + + // if iterating both terms and fields, check if current field has another term + if fti.termIter != nil { + if fti.termIter.Next() { + fti.current.valid = true + fti.current.term, _ = fti.termIter.Current() + return true + } + if err := fti.termIter.Err(); err != nil { + fti.err = err + return false + } + if err := fti.termIter.Close(); err != nil { + fti.err = err + return false + } + } + + // i.e. need to switch to next field + hasNext := fti.setNextField() + if !hasNext { + return false + } + + // and get next term for the field + termsIter, err := fti.seg.TermsIterable().Terms(fti.current.field) + if err != nil { + fti.current.valid = false + fti.err = err + return false + } + fti.termIter = termsIter + + hasNext = fti.termIter.Next() + if !hasNext { + fti.current.valid = false + fti.err = fti.fieldIter.Err() + } + + fti.current.valid = true + fti.current.term, _ = fti.termIter.Current() + return true +} + +func (fti *fieldsAndTermsIter) Next() bool { + if fti.err != nil || fti.done { + return false + } + return fti.setNext() +} + +func (fti *fieldsAndTermsIter) Current() (field, term []byte) { + return fti.current.field, fti.current.term +} + +func (fti *fieldsAndTermsIter) Err() error { + return fti.err +} + +func (fti *fieldsAndTermsIter) Close() error { + var multiErr xerrors.MultiError + if fti.fieldIter != nil { + multiErr = multiErr.Add(fti.fieldIter.Close()) + } + if fti.termIter != nil { + multiErr = multiErr.Add(fti.termIter.Close()) + } + multiErr = multiErr.Add(fti.Reset(nil, fieldsAndTermsIteratorOpts{})) + return multiErr.FinalError() +} diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index 407cee94e0..2b036e1c35 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -192,6 +192,35 @@ func (mr *MockAggregateResultsMockRecorder) AddDocuments(arg0 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddDocuments", reflect.TypeOf((*MockAggregateResults)(nil).AddDocuments), arg0) } +// AddFields mocks base method +func (m *MockAggregateResults) AddFields(arg0 []AggregateResultsEntry) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddFields", arg0) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AddFields indicates an expected call of AddFields +func (mr *MockAggregateResultsMockRecorder) AddFields(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddFields", reflect.TypeOf((*MockAggregateResults)(nil).AddFields), arg0) +} + +// AggregateResultsOptions mocks base method +func (m *MockAggregateResults) AggregateResultsOptions() AggregateResultsOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateResultsOptions") + ret0, _ := ret[0].(AggregateResultsOptions) + return ret0 +} + +// AggregateResultsOptions indicates an expected call of AggregateResultsOptions +func (mr *MockAggregateResultsMockRecorder) AggregateResultsOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateResultsOptions", reflect.TypeOf((*MockAggregateResults)(nil).AggregateResultsOptions)) +} + // Finalize mocks base method func (m *MockAggregateResults) Finalize() { m.ctrl.T.Helper() @@ -295,6 +324,21 @@ func (mr *MockBlockMockRecorder) AddResults(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddResults", reflect.TypeOf((*MockBlock)(nil).AddResults), arg0) } +// Aggregate mocks base method +func (m *MockBlock) Aggregate(arg0 *resource.CancellableLifetime, arg1 QueryOptions, arg2 AggregateResults) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Aggregate", arg0, arg1, arg2) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Aggregate indicates an expected call of Aggregate +func (mr *MockBlockMockRecorder) Aggregate(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Aggregate", reflect.TypeOf((*MockBlock)(nil).Aggregate), arg0, arg1, arg2) +} + // Close mocks base method func (m *MockBlock) Close() error { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 0a0054d70f..049923c655 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -45,6 +45,14 @@ const ( documentArrayPoolSize = 256 documentArrayPoolCapacity = 256 documentArrayPoolMaxCapacity = 256 // Do not allow grows, since we know the size + + // aggregateResultsEntryArrayPool size in general: 256*256*sizeof(doc.Field) + // = 256 * 256 * 16 + // = 1mb (but with Go's heap probably 2mb) + // TODO(prateek): Make this configurable in a followup change. + aggregateResultsEntryArrayPoolSize = 256 + aggregateResultsEntryArrayPoolCapacity = 256 + aggregateResultsEntryArrayPoolMaxCapacity = 256 // Do not allow grows, since we know the size ) var ( @@ -102,6 +110,7 @@ type opts struct { aggResultsPool AggregateResultsPool aggValuesPool AggregateValuesPool docArrayPool doc.DocumentArrayPool + aggResultsEntryArrayPool AggregateResultsEntryArrayPool foregroundCompactionPlannerOpts compaction.PlannerOptions backgroundCompactionPlannerOpts compaction.PlannerOptions postingsListCache *PostingsListCache @@ -131,6 +140,14 @@ func NewOptions() Options { }) docArrayPool.Init() + aggResultsEntryArrayPool := NewAggregateResultsEntryArrayPool(AggregateResultsEntryArrayPoolOpts{ + Options: pool.NewObjectPoolOptions(). + SetSize(aggregateResultsEntryArrayPoolSize), + Capacity: aggregateResultsEntryArrayPoolCapacity, + MaxCapacity: aggregateResultsEntryArrayPoolMaxCapacity, + }) + aggResultsEntryArrayPool.Init() + instrumentOpts := instrument.NewOptions() opts := &opts{ insertMode: defaultIndexInsertMode, @@ -145,6 +162,7 @@ func NewOptions() Options { aggResultsPool: aggResultsPool, aggValuesPool: aggValuesPool, docArrayPool: docArrayPool, + aggResultsEntryArrayPool: aggResultsEntryArrayPool, foregroundCompactionPlannerOpts: defaultForegroundCompactionOpts, backgroundCompactionPlannerOpts: defaultBackgroundCompactionOpts, } @@ -304,6 +322,16 @@ func (o *opts) DocumentArrayPool() doc.DocumentArrayPool { return o.docArrayPool } +func (o *opts) SetAggregateResultsEntryArrayPool(value AggregateResultsEntryArrayPool) Options { + opts := *o + opts.aggResultsEntryArrayPool = value + return &opts +} + +func (o *opts) AggregateResultsEntryArrayPool() AggregateResultsEntryArrayPool { + return o.aggResultsEntryArrayPool +} + func (o *opts) SetForegroundCompactionPlannerOptions(value compaction.PlannerOptions) Options { opts := *o opts.foregroundCompactionPlannerOpts = value diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 94ed466e03..c8bf6aa847 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -33,11 +33,11 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment/builder" "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/index/segment/mem" - "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" ) @@ -181,6 +181,16 @@ type AggregateResults interface { aggregateQueryOpts AggregateResultsOptions, ) + // AggregateResultsOptions returns the set AggregateResultsOptions. + AggregateResultsOptions() AggregateResultsOptions + + // AddFields adds the batch of fields to the results set, it will + // take a copy of the bytes backing the documents so the original can be + // modified after this function returns without affecting the results map. + AddFields( + batch []AggregateResultsEntry, + ) (size int, err error) + // Map returns a map from tag name -> possible tag values, // comprising aggregate results. // Since a lock is not held when accessing the map after a call to this @@ -238,6 +248,13 @@ type AggregateValuesPool interface { Put(value AggregateValues) } +// AggregateResultsEntry is used during block.Aggregate() execution +// to collect entries. +type AggregateResultsEntry struct { + Field ident.ID + Terms []ident.ID +} + // OnIndexSeries provides a set of callback hooks to allow the reverse index // to do lifecycle management of any resources retained during indexing. type OnIndexSeries interface { @@ -273,6 +290,16 @@ type Block interface { results BaseResults, ) (exhaustive bool, err error) + // Aggregate aggregates known tag names/values. + // NB(prateek): this is different from Query, as we can + // avoid going to documents, relying purely on the indexed + // FSTs. + Aggregate( + cancellable *resource.CancellableLifetime, + opts QueryOptions, + results AggregateResults, + ) (exhaustive bool, err error) + // AddResults adds bootstrap results to the block, if c. AddResults(results result.IndexBlock) error @@ -775,6 +802,12 @@ type Options interface { // DocumentArrayPool returns the document array pool. DocumentArrayPool() doc.DocumentArrayPool + // SetAggregateResultsEntryArrayPool sets the aggregate results entry array pool. + SetAggregateResultsEntryArrayPool(value AggregateResultsEntryArrayPool) Options + + // AggregateResultsEntryArrayPool returns the aggregate results entry array pool. + AggregateResultsEntryArrayPool() AggregateResultsEntryArrayPool + // SetForegroundCompactionPlannerOptions sets the compaction planner options. SetForegroundCompactionPlannerOptions(v compaction.PlannerOptions) Options diff --git a/src/dbnode/storage/index_block_test.go b/src/dbnode/storage/index_block_test.go index 3c1346751b..e3a8f4634c 100644 --- a/src/dbnode/storage/index_block_test.go +++ b/src/dbnode/storage/index_block_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/idx" "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -594,6 +595,7 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() + query := idx.NewFieldQuery([]byte("a")) retention := 2 * time.Hour blockSize := time.Hour now := time.Now().Truncate(blockSize).Add(10 * time.Minute) @@ -657,7 +659,8 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { // only queries as much as is needed (wrt to time) ctx := context.NewContext() - q := index.Query{} + + q := index.Query{query} qOpts := index.QueryOptions{ StartInclusive: t0, EndExclusive: now.Add(time.Minute), @@ -689,3 +692,105 @@ func TestNamespaceIndexBlockAggregateQuery(t *testing.T) { _, err = idx.AggregateQuery(ctx, q, aggOpts) require.NoError(t, err) } + +func TestNamespaceIndexBlockAggregateQueryWithAllQuery(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{T: t}) + defer ctrl.Finish() + + query := idx.NewAllQuery() + retention := 2 * time.Hour + blockSize := time.Hour + now := time.Now().Truncate(blockSize).Add(10 * time.Minute) + t0 := now.Truncate(blockSize) + t0Nanos := xtime.ToUnixNano(t0) + t1 := t0.Add(1 * blockSize) + t1Nanos := xtime.ToUnixNano(t1) + t2 := t1.Add(1 * blockSize) + var nowLock sync.Mutex + nowFn := func() time.Time { + nowLock.Lock() + defer nowLock.Unlock() + return now + } + opts := testDatabaseOptions() + opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(nowFn)) + + b0 := index.NewMockBlock(ctrl) + b0.EXPECT().Stats(gomock.Any()).Return(nil).AnyTimes() + b0.EXPECT().Close().Return(nil) + b0.EXPECT().StartTime().Return(t0).AnyTimes() + b0.EXPECT().EndTime().Return(t0.Add(blockSize)).AnyTimes() + b1 := index.NewMockBlock(ctrl) + b1.EXPECT().Stats(gomock.Any()).Return(nil).AnyTimes() + b1.EXPECT().Close().Return(nil) + b1.EXPECT().StartTime().Return(t1).AnyTimes() + b1.EXPECT().EndTime().Return(t1.Add(blockSize)).AnyTimes() + newBlockFn := func( + ts time.Time, + md namespace.Metadata, + _ index.BlockOptions, + io index.Options, + ) (index.Block, error) { + if ts.Equal(t0) { + return b0, nil + } + if ts.Equal(t1) { + return b1, nil + } + panic("should never get here") + } + md := testNamespaceMetadata(blockSize, retention) + idx, err := newNamespaceIndexWithNewBlockFn(md, newBlockFn, opts) + require.NoError(t, err) + + defer func() { + require.NoError(t, idx.Close()) + }() + + seg1 := segment.NewMockSegment(ctrl) + seg2 := segment.NewMockSegment(ctrl) + seg3 := segment.NewMockSegment(ctrl) + bootstrapResults := result.IndexResults{ + t0Nanos: result.NewIndexBlock(t0, []segment.Segment{seg1}, result.NewShardTimeRanges(t0, t1, 1, 2, 3)), + t1Nanos: result.NewIndexBlock(t1, []segment.Segment{seg2, seg3}, result.NewShardTimeRanges(t1, t2, 1, 2, 3)), + } + + b0.EXPECT().AddResults(bootstrapResults[t0Nanos]).Return(nil) + b1.EXPECT().AddResults(bootstrapResults[t1Nanos]).Return(nil) + require.NoError(t, idx.Bootstrap(bootstrapResults)) + + // only queries as much as is needed (wrt to time) + ctx := context.NewContext() + + q := index.Query{query} + qOpts := index.QueryOptions{ + StartInclusive: t0, + EndExclusive: now.Add(time.Minute), + } + aggOpts := index.AggregationOptions{QueryOptions: qOpts} + + b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) + _, err = idx.AggregateQuery(ctx, q, aggOpts) + require.NoError(t, err) + + // queries multiple blocks if needed + qOpts = index.QueryOptions{ + StartInclusive: t0, + EndExclusive: t2.Add(time.Minute), + } + aggOpts = index.AggregationOptions{QueryOptions: qOpts} + b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) + b1.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(true, nil) + _, err = idx.AggregateQuery(ctx, q, aggOpts) + require.NoError(t, err) + + // stops querying once a block returns non-exhaustive + qOpts = index.QueryOptions{ + StartInclusive: t0, + EndExclusive: t0.Add(time.Minute), + } + b0.EXPECT().Aggregate(gomock.Any(), qOpts, gomock.Any()).Return(false, nil) + aggOpts = index.AggregationOptions{QueryOptions: qOpts} + _, err = idx.AggregateQuery(ctx, q, aggOpts) + require.NoError(t, err) +} diff --git a/src/m3ninx/generated-source-files.mk b/src/m3ninx/generated-source-files.mk index 5b9bc0dd51..980fa83c3b 100644 --- a/src/m3ninx/generated-source-files.mk +++ b/src/m3ninx/generated-source-files.mk @@ -98,9 +98,9 @@ genny-map-segment-mem-fieldsmap: # generation rule for all generated arraypools .PHONY: genny-arraypool-all -genny-arraypool-all: \ - genny-arraypool-bytes-slice-array-pool \ - genny-arraypool-document-array-pool \ +genny-arraypool-all: \ + genny-arraypool-bytes-slice-array-pool \ + genny-arraypool-document-array-pool \ # arraypool generation rule for ./x/bytes.SliceArrayPool .PHONY: genny-arraypool-bytes-slice-array-pool @@ -114,7 +114,7 @@ genny-arraypool-bytes-slice-array-pool: rename_type_middle=Slice \ rename_constructor=NewSliceArrayPool \ - # arraypool generation rule for ./doc.DocumentArrayPool +# arraypool generation rule for ./doc.DocumentArrayPool .PHONY: genny-arraypool-document-array-pool genny-arraypool-document-array-pool: cd $(m3x_package_path) && make genny-arraypool \ @@ -127,4 +127,3 @@ genny-arraypool-document-array-pool: rename_constructor=NewDocumentArrayPool \ rename_gen_types=true \ - diff --git a/src/m3ninx/index/segment/fst/fst_terms_iterator.go b/src/m3ninx/index/segment/fst/fst_terms_iterator.go index bcfb4e39ee..3576d056cd 100644 --- a/src/m3ninx/index/segment/fst/fst_terms_iterator.go +++ b/src/m3ninx/index/segment/fst/fst_terms_iterator.go @@ -33,7 +33,7 @@ type fstTermsIterOpts struct { } func (o fstTermsIterOpts) Close() error { - if o.finalizeFST { + if o.finalizeFST && o.fst != nil { return o.fst.Close() } return nil @@ -121,7 +121,6 @@ func (f *fstTermsIter) Close() error { var multiErr xerrors.MultiError multiErr = multiErr.Add(f.iter.Close()) multiErr = multiErr.Add(f.opts.Close()) - f.clear() return multiErr.FinalError() } diff --git a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go index 7989f5f244..09cf5bb58b 100644 --- a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go +++ b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go @@ -108,7 +108,10 @@ func (f *fstTermsPostingsIter) Err() error { } func (f *fstTermsPostingsIter) Close() error { - err := f.termsIter.Close() + var err error + if f.termsIter != nil { + err = f.termsIter.Close() + } f.clear() return err } diff --git a/src/m3ninx/search/proptest/query_gen.go b/src/m3ninx/search/proptest/query_gen.go index b3b18a1e8c..f7e6820345 100644 --- a/src/m3ninx/search/proptest/query_gen.go +++ b/src/m3ninx/search/proptest/query_gen.go @@ -187,5 +187,3 @@ func GenQuery(docs []doc.Document) gopter.Gen { GenConjunctionQuery(docs), GenDisjunctionQuery(docs)) } - -// Ge