Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce poller allocs #4393

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 48 additions & 45 deletions tempodb/blocklist/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {

var (
consecutiveErrorsRemaining = p.cfg.TolerateConsecutiveErrors
newBlockList = make([]*backend.BlockMeta, 0)
newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0)
newBlockList = make([]*backend.BlockMeta, 0, 1000)
newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0, 1000)
err error
)

for consecutiveErrorsRemaining >= 0 {
newBlockList, newCompactedBlockList, err = p.pollTenantAndCreateIndex(ctx, tenantID, previous)
err = p.pollTenantAndCreateIndex(ctx, tenantID, previous, &newBlockList, &newCompactedBlockList)
if err == nil {
break
}
Expand Down Expand Up @@ -241,7 +241,9 @@ func (p *Poller) pollTenantAndCreateIndex(
ctx context.Context,
tenantID string,
previous *List,
) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) {
newBlockList *[]*backend.BlockMeta,
newCompactedBlockList *[]*backend.CompactedBlockMeta,
) error {
derivedCtx, span := tracer.Start(ctx, "Poller.pollTenantAndCreateIndex", trace.WithAttributes(attribute.String("tenant", tenantID)))
defer span.End()

Expand All @@ -260,15 +262,17 @@ func (p *Poller) pollTenantAndCreateIndex(

span.SetAttributes(attribute.Int("metas", len(i.Meta)))
span.SetAttributes(attribute.Int("compactedMetas", len(i.CompactedMeta)))
return i.Meta, i.CompactedMeta, nil
*newBlockList = append(*newBlockList, i.Meta...)
*newCompactedBlockList = append(*newCompactedBlockList, i.CompactedMeta...)
return nil
}

metricTenantIndexErrors.WithLabelValues(tenantID).Inc()
span.RecordError(err)

// there was an error, return the error if we're not supposed to fallback to polling
if !p.cfg.PollFallback {
return nil, nil, fmt.Errorf("failed to pull tenant index and no fallback configured: %w", err)
return fmt.Errorf("failed to pull tenant index and no fallback configured: %w", err)
}

// polling fallback is true, log the error and continue in this method to completely poll the backend
Expand All @@ -279,52 +283,54 @@ func (p *Poller) pollTenantAndCreateIndex(
// there was a failure to pull the tenant index and we are configured to fall
// back to polling.
metricTenantIndexBuilder.WithLabelValues(tenantID).Set(1)
blocklist, compactedBlocklist, err := p.pollTenantBlocks(derivedCtx, tenantID, previous)
err := p.pollTenantBlocks(derivedCtx, tenantID, previous, newBlockList, newCompactedBlockList)
if err != nil {
return nil, nil, fmt.Errorf("failed to poll tenant blocks: %w", err)
return fmt.Errorf("failed to poll tenant blocks: %w", err)
}

// everything is happy, write this tenant index
level.Info(p.logger).Log("msg", "writing tenant index", "tenant", tenantID, "metas", len(blocklist), "compactedMetas", len(compactedBlocklist))
err = p.writer.WriteTenantIndex(derivedCtx, tenantID, blocklist, compactedBlocklist)
level.Info(p.logger).Log("msg", "writing tenant index", "tenant", tenantID, "metas", len(*newBlockList), "compactedMetas", len(*newCompactedBlockList))
err = p.writer.WriteTenantIndex(derivedCtx, tenantID, *newBlockList, *newCompactedBlockList)
if err != nil {
metricTenantIndexErrors.WithLabelValues(tenantID).Inc()
level.Error(p.logger).Log("msg", "failed to write tenant index", "tenant", tenantID, "err", err)
}

if len(blocklist) == 0 && len(compactedBlocklist) == 0 {
if len(*newBlockList) == 0 && len(*newCompactedBlockList) == 0 {
err := p.deleteTenant(ctx, tenantID)
if err != nil {
return nil, nil, fmt.Errorf("failed to delete tenant: %w", err)
return fmt.Errorf("failed to delete tenant: %w", err)
}
}

metricTenantIndexAgeSeconds.WithLabelValues(tenantID).Set(0)

return blocklist, compactedBlocklist, nil
return nil
}

func (p *Poller) pollTenantBlocks(
ctx context.Context,
tenantID string,
previous *List,
) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) {
newBlockList *[]*backend.BlockMeta,
newCompactedBlockList *[]*backend.CompactedBlockMeta,
) error {
derivedCtx, span := tracer.Start(ctx, "Poller.pollTenantBlocks")
defer span.End()

currentBlockIDs, currentCompactedBlockIDs, err := p.reader.Blocks(derivedCtx, tenantID)
if err != nil {
return nil, nil, fmt.Errorf("failed listing tenant blocks: %w", err)
return fmt.Errorf("failed listing tenant blocks: %w", err)
}

var (
metas = previous.Metas(tenantID)
compactedMetas = previous.CompactedMetas(tenantID)
mm = make(map[backend.UUID]*backend.BlockMeta, len(metas))
cm = make(map[backend.UUID]*backend.CompactedBlockMeta, len(compactedMetas))
newBlockList = make([]*backend.BlockMeta, 0, len(currentBlockIDs))
newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(currentCompactedBlockIDs))
unknownBlockIDs = make(map[uuid.UUID]bool, 1000)
metas = previous.Metas(tenantID)
compactedMetas = previous.CompactedMetas(tenantID)
mm = make(map[backend.UUID]*backend.BlockMeta, len(metas))
cm = make(map[backend.UUID]*backend.CompactedBlockMeta, len(compactedMetas))
// newBlockList = make([]*backend.BlockMeta, 0, len(currentBlockIDs))
// newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(currentCompactedBlockIDs))
unknownBlockIDs = make(map[uuid.UUID]bool, 1000)
)

span.SetAttributes(attribute.Int("metas", len(metas)))
Expand All @@ -342,7 +348,7 @@ func (p *Poller) pollTenantBlocks(
for _, blockID := range currentBlockIDs {
// if we already have this block id in our previous list, use the existing data.
if v, ok := mm[backend.UUID(blockID)]; ok {
newBlockList = append(newBlockList, v)
*newBlockList = append(*newBlockList, v)
continue
}
unknownBlockIDs[blockID] = false
Expand All @@ -352,7 +358,7 @@ func (p *Poller) pollTenantBlocks(
for _, blockID := range currentCompactedBlockIDs {
// if we already have this block id in our previous list, use the existing data.
if v, ok := cm[backend.UUID(blockID)]; ok {
newCompactedBlocklist = append(newCompactedBlocklist, v)
*newCompactedBlockList = append(*newCompactedBlockList, v)
continue
}

Expand All @@ -364,42 +370,39 @@ func (p *Poller) pollTenantBlocks(

}

newM, newCm, err := p.pollUnknown(derivedCtx, unknownBlockIDs, tenantID)
err = p.pollUnknown(derivedCtx, unknownBlockIDs, tenantID, newBlockList, newCompactedBlockList)
if err != nil {
return nil, nil, fmt.Errorf("failed reading unknown blocks: %w", err)
return fmt.Errorf("failed reading unknown blocks: %w", err)
}

newBlockList = append(newBlockList, newM...)
newCompactedBlocklist = append(newCompactedBlocklist, newCm...)

sort.Slice(newBlockList, func(i, j int) bool {
return newBlockList[i].StartTime.Before(newBlockList[j].StartTime)
sort.Slice(*newBlockList, func(i, j int) bool {
return (*newBlockList)[i].StartTime.Before((*newBlockList)[j].StartTime)
})

sort.Slice(newCompactedBlocklist, func(i, j int) bool {
return newCompactedBlocklist[i].StartTime.Before(newCompactedBlocklist[j].StartTime)
sort.Slice(*newCompactedBlockList, func(i, j int) bool {
return (*newCompactedBlockList)[i].StartTime.Before((*newCompactedBlockList)[j].StartTime)
})

return newBlockList, newCompactedBlocklist, nil
return nil
}

func (p *Poller) pollUnknown(
ctx context.Context,
unknownBlocks map[uuid.UUID]bool,
tenantID string,
) ([]*backend.BlockMeta, []*backend.CompactedBlockMeta, error) {
newBlockList *[]*backend.BlockMeta,
newCompactedBlockList *[]*backend.CompactedBlockMeta,
) error {
derivedCtx, span := tracer.Start(ctx, "pollUnknown", trace.WithAttributes(
attribute.Int("unknownBlockIDs", len(unknownBlocks)),
))
defer span.End()

var (
err error
errs []error
mtx sync.Mutex
bg = boundedwaitgroup.New(p.cfg.PollConcurrency)
newBlockList = make([]*backend.BlockMeta, 0, len(unknownBlocks))
newCompactedBlocklist = make([]*backend.CompactedBlockMeta, 0, len(unknownBlocks))
err error
errs []error
mtx sync.Mutex
bg = boundedwaitgroup.New(p.cfg.PollConcurrency)
)

for blockID, compacted := range unknownBlocks {
Expand All @@ -423,12 +426,12 @@ func (p *Poller) pollUnknown(
mtx.Lock()
defer mtx.Unlock()
if m != nil {
newBlockList = append(newBlockList, m)
*newBlockList = append(*newBlockList, m)
return
}

if cm != nil {
newCompactedBlocklist = append(newCompactedBlocklist, cm)
*newCompactedBlockList = append(*newCompactedBlockList, cm)
return
}

Expand All @@ -446,10 +449,10 @@ func (p *Poller) pollUnknown(
span.SetStatus(codes.Error, "")
span.RecordError(err)

return nil, nil, err
return err
}

return newBlockList, newCompactedBlocklist, nil
return nil
}

func (p *Poller) pollBlock(
Expand Down
18 changes: 10 additions & 8 deletions tempodb/blocklist/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1009,11 +1009,13 @@ func BenchmarkPoller10k(b *testing.B) {
currentPerTenantCompacted := maps.Clone(previousPerTenantCompacted)

var (
c = newMockCompactor(currentPerTenantCompacted, false)
w = &backend.MockWriter{}
s = &mockJobSharder{owns: true}
r = newMockReader(currentPerTenant, currentPerTenantCompacted, false)
previous = newBlocklist(previousPerTenant, previousPerTenantCompacted)
c = newMockCompactor(currentPerTenantCompacted, false)
w = &backend.MockWriter{}
s = &mockJobSharder{owns: true}
r = newMockReader(currentPerTenant, currentPerTenantCompacted, false)
previous = newBlocklist(previousPerTenant, previousPerTenantCompacted)
newBlockList = make([]*backend.BlockMeta, 0, tc.tenantCount*tc.blocksPerTenant)
newCompactedBlockList = make([]*backend.CompactedBlockMeta, 0, tc.tenantCount*tc.blocksPerTenant)
)

// This mock reader returns error or nil based on the tenant ID
Expand All @@ -1027,7 +1029,7 @@ func BenchmarkPoller10k(b *testing.B) {
runName := fmt.Sprintf("%d-%d", tc.tenantCount, tc.blocksPerTenant)
b.Run(runName, func(b *testing.B) {
for tenant := range previousPerTenant {
benchmarkPollTenant(b, poller, tenant, previous)
benchmarkPollTenant(b, poller, tenant, previous, &newBlockList, &newCompactedBlockList)
}
})
}
Expand Down Expand Up @@ -1162,10 +1164,10 @@ func BenchmarkFullPoller(b *testing.B) {
}
}

func benchmarkPollTenant(b *testing.B, poller *Poller, tenant string, previous *List) {
func benchmarkPollTenant(b *testing.B, poller *Poller, tenant string, previous *List, newBlockList *[]*backend.BlockMeta, newCompactedBlockList *[]*backend.CompactedBlockMeta) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
_, _, err := poller.pollTenantBlocks(context.Background(), tenant, previous)
err := poller.pollTenantBlocks(context.Background(), tenant, previous, newBlockList, newCompactedBlockList)
require.NoError(b, err)
}
}
Expand Down