Skip to content

Commit

Permalink
Adding the serialization features. (#1666)
Browse files Browse the repository at this point in the history
* Adding the serialization features.

* Dont test this with race condition since we access vars directly.

* Fix test.

* Fix typo in file name and return early in DeserializeToSeriesGroup.

* Update internal/component/prometheus/remote/queue/serialization/appender.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/serialization/serializer.go

Co-authored-by: Piotr <[email protected]>

* Rename to indicate that TimeSeries are Put/Get from a pool.

* Remove func that was about the same number of lines as inlining.

* Update internal/component/prometheus/remote/queue/types/serialization.go

Co-authored-by: Piotr <[email protected]>

* Update internal/component/prometheus/remote/queue/serialization/serializer.go

Co-authored-by: Piotr <[email protected]>

* Change benchmark to be more specific.

---------

Co-authored-by: Piotr <[email protected]>
  • Loading branch information
mattdurham and thampiotr authored Sep 16, 2024
1 parent 4670f64 commit 626113f
Show file tree
Hide file tree
Showing 13 changed files with 5,212 additions and 11 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ lint: alloylint
# final command runs tests for all other submodules.
test:
$(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/)
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker
$(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization
$(GO_ENV) find . -name go.mod -not -path "./go.mod" -execdir go test -race ./... \;

test-packages:
Expand Down
20 changes: 10 additions & 10 deletions internal/component/prometheus/remote/queue/filequeue/filequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type queue struct {
// block until ready for another record.
out func(ctx context.Context, dh types.DataHandle)
// existingFiles is the list of files found initially.
existingsFiles []string
existingFiles []string
}

// NewQueue returns a implementation of FileStorage.
Expand Down Expand Up @@ -61,18 +61,18 @@ func NewQueue(directory string, out func(ctx context.Context, dh types.DataHandl
currentMaxID = ids[len(ids)-1]
}
q := &queue{
directory: directory,
maxID: currentMaxID,
logger: logger,
out: out,
dataQueue: actor.NewMailbox[types.Data](),
existingsFiles: make([]string, 0),
directory: directory,
maxID: currentMaxID,
logger: logger,
out: out,
dataQueue: actor.NewMailbox[types.Data](),
existingFiles: make([]string, 0),
}

// Save the existing files in `q.existingFiles`, which will have their data pushed to `out` when actor starts.
for _, id := range ids {
name := filepath.Join(directory, fmt.Sprintf("%d.committed", id))
q.existingsFiles = append(q.existingsFiles, name)
q.existingFiles = append(q.existingFiles, name)
}
return q, nil
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func get(logger log.Logger, name string) (map[string]string, []byte, error) {
// DoWork allows most of the queue to be single threaded with work only coming in and going out via mailboxes(channels).
func (q *queue) DoWork(ctx actor.Context) actor.WorkerStatus {
// Queue up our existing items.
for _, name := range q.existingsFiles {
for _, name := range q.existingFiles {
q.out(ctx, types.DataHandle{
Name: name,
Pop: func() (map[string]string, []byte, error) {
Expand All @@ -124,7 +124,7 @@ func (q *queue) DoWork(ctx actor.Context) actor.WorkerStatus {
})
}
// We only want to process existing files once.
q.existingsFiles = nil
q.existingFiles = nil
select {
case <-ctx.Done():
return actor.WorkerEnd
Expand Down
121 changes: 121 additions & 0 deletions internal/component/prometheus/remote/queue/serialization/appender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package serialization

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/component/prometheus/remote/queue/types"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
)

type appender struct {
ctx context.Context
ttl time.Duration
s types.Serializer
logger log.Logger
}

func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) {
// TODO @mattdurham figure out what to do here later. This mirrors what we do elsewhere.
return ref, nil
}

// NewAppender returns an Appender that writes to a given serializer. NOTE the returned Appender writes
// data immediately, discards data older than `ttl` and does not honor commit or rollback.
func NewAppender(ctx context.Context, ttl time.Duration, s types.Serializer, logger log.Logger) storage.Appender {
app := &appender{
ttl: ttl,
s: s,
logger: logger,
ctx: ctx,
}
return app
}

// Append metric
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// Check to see if the TTL has expired for this record.
endTime := time.Now().Unix() - int64(a.ttl.Seconds())
if t < endTime {
return ref, nil
}
ts := types.GetTimeSeriesFromPool()
ts.Labels = l
ts.TS = t
ts.Value = v
ts.Hash = l.Hash()
err := a.s.SendSeries(a.ctx, ts)
return ref, err
}

// Commit is a no op since we always write.
func (a *appender) Commit() (_ error) {
return nil
}

// Rollback is a no op since we write all the data.
func (a *appender) Rollback() error {
return nil
}

// AppendExemplar appends exemplar to cache. The passed in labels is unused, instead use the labels on the exemplar.
func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exemplar.Exemplar) (_ storage.SeriesRef, _ error) {
endTime := time.Now().Unix() - int64(a.ttl.Seconds())
if e.HasTs && e.Ts < endTime {
return ref, nil
}
ts := types.GetTimeSeriesFromPool()
ts.Hash = e.Labels.Hash()
ts.TS = e.Ts
ts.Labels = e.Labels
ts.Hash = e.Labels.Hash()
err := a.s.SendSeries(a.ctx, ts)
return ref, err
}

// AppendHistogram appends histogram
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (_ storage.SeriesRef, _ error) {
endTime := time.Now().Unix() - int64(a.ttl.Seconds())
if t < endTime {
return ref, nil
}
ts := types.GetTimeSeriesFromPool()
ts.Labels = l
ts.TS = t
if h != nil {
ts.FromHistogram(t, h)
} else {
ts.FromFloatHistogram(t, fh)
}
ts.Hash = l.Hash()
err := a.s.SendSeries(a.ctx, ts)
return ref, err
}

// UpdateMetadata updates metadata.
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (_ storage.SeriesRef, _ error) {
ts := types.GetTimeSeriesFromPool()
// We are going to handle converting some strings to hopefully not reused label names. TimeSeriesBinary has a lot of work
// to ensure its efficient it makes sense to encode metadata into it.
combinedLabels := l.Copy()
combinedLabels = append(combinedLabels, labels.Label{
Name: "__alloy_metadata_type__",
Value: string(m.Type),
})
combinedLabels = append(combinedLabels, labels.Label{
Name: "__alloy_metadata_help__",
Value: m.Help,
})
combinedLabels = append(combinedLabels, labels.Label{
Name: "__alloy_metadata_unit__",
Value: m.Unit,
})
ts.Labels = combinedLabels
err := a.s.SendMetadata(a.ctx, ts)
return ref, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package serialization

import (
"context"
log2 "github.com/go-kit/log"
"github.com/grafana/alloy/internal/component/prometheus/remote/queue/types"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func TestAppenderTTL(t *testing.T) {
fake := &counterSerializer{}
l := log2.NewNopLogger()

app := NewAppender(context.Background(), 1*time.Minute, fake, l)
_, err := app.Append(0, labels.FromStrings("one", "two"), time.Now().Unix(), 0)
require.NoError(t, err)

for i := 0; i < 10; i++ {
_, err = app.Append(0, labels.FromStrings("one", "two"), time.Now().Add(-5*time.Minute).Unix(), 0)
require.NoError(t, err)
}
// Only one record should make it through.
require.True(t, fake.received == 1)
}

var _ types.Serializer = (*fakeSerializer)(nil)

type counterSerializer struct {
received int
}

func (f *counterSerializer) Start() {

}

func (f *counterSerializer) Stop() {

}

func (f *counterSerializer) SendSeries(ctx context.Context, data *types.TimeSeriesBinary) error {
f.received++
return nil

}

func (f *counterSerializer) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary) error {
return nil
}

func (f *counterSerializer) UpdateConfig(ctx context.Context, data types.SerializerConfig) error {
return nil
}
Loading

0 comments on commit 626113f

Please sign in to comment.