diff --git a/cmd/jaeger/internal/integration/trace_reader.go b/cmd/jaeger/internal/integration/trace_reader.go index cfa102de13a..a61edf63f43 100644 --- a/cmd/jaeger/internal/integration/trace_reader.go +++ b/cmd/jaeger/internal/integration/trace_reader.go @@ -90,7 +90,7 @@ func (r *traceReader) GetServices(ctx context.Context) ([]string, error) { return res.Services, nil } -func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { +func (r *traceReader) GetOperations(ctx context.Context, query tracestore.OperationQueryParams) ([]tracestore.Operation, error) { var operations []tracestore.Operation res, err := r.client.GetOperations(ctx, &api_v3.GetOperationsRequest{ Service: query.ServiceName, diff --git a/cmd/query/app/querysvc/v2/querysvc/package_test.go b/cmd/query/app/querysvc/v2/querysvc/package_test.go new file mode 100644 index 00000000000..755423f86da --- /dev/null +++ b/cmd/query/app/querysvc/v2/querysvc/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package querysvc + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/query/app/querysvc/v2/querysvc/service.go b/cmd/query/app/querysvc/v2/querysvc/service.go new file mode 100644 index 00000000000..bfcc98053c3 --- /dev/null +++ b/cmd/query/app/querysvc/v2/querysvc/service.go @@ -0,0 +1,203 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package querysvc + +import ( + "context" + "errors" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/cmd/query/app/querysvc/v2/adjuster" + "github.com/jaegertracing/jaeger/internal/jptrace" + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage_v2/depstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" +) + +var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") + +const ( + defaultMaxClockSkewAdjust = time.Second +) + +// QueryServiceOptions holds the configuration options for the query service. +type QueryServiceOptions struct { + // ArchiveTraceReader is used to read archived traces from the storage. + ArchiveTraceReader tracestore.Reader + // ArchiveTraceWriter is used to write traces to the archive storage. + ArchiveTraceWriter tracestore.Writer + // Adjuster is used to adjust traces before they are returned to the client. + // If not set, the default adjuster will be used. + Adjuster adjuster.Adjuster +} + +// StorageCapabilities is a feature flag for query service +type StorageCapabilities struct { + ArchiveStorage bool `json:"archiveStorage"` + // TODO: Maybe add metrics Storage here + // SupportRegex bool + // SupportTagFilter bool +} + +// QueryService provides methods to query data from the storage. +type QueryService struct { + traceReader tracestore.Reader + dependencyReader depstore.Reader + options QueryServiceOptions +} + +// GetTraceParams defines the parameters for retrieving traces using the GetTraces function. +type GetTraceParams struct { + // TraceIDs is a slice of trace identifiers to fetch. + TraceIDs []tracestore.GetTraceParams + // RawTraces indicates whether to retrieve raw traces. + // If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster. + RawTraces bool +} + +// TraceQueryParams represents the parameters for querying a batch of traces. +type TraceQueryParams struct { + tracestore.TraceQueryParams + // RawTraces indicates whether to retrieve raw traces. + // If set to false, the traces will be adjusted using QueryServiceOptions.Adjuster. + RawTraces bool +} + +func NewQueryService( + traceReader tracestore.Reader, + dependencyReader depstore.Reader, + options QueryServiceOptions, +) *QueryService { + qsvc := &QueryService{ + traceReader: traceReader, + dependencyReader: dependencyReader, + options: options, + } + + if qsvc.options.Adjuster == nil { + qsvc.options.Adjuster = adjuster.Sequence( + adjuster.StandardAdjusters(defaultMaxClockSkewAdjust)...) + } + return qsvc +} + +// GetTraces retrieves traces with given trace IDs from the primary reader, +// and if any of them are not found it then queries the archive reader. +// The iterator is single-use: once consumed, it cannot be used again. +func (qs QueryService) GetTraces( + ctx context.Context, + params GetTraceParams, +) iter.Seq2[[]ptrace.Traces, error] { + getTracesIter := qs.traceReader.GetTraces(ctx, params.TraceIDs...) + return func(yield func([]ptrace.Traces, error) bool) { + foundTraceIDs, proceed := qs.receiveTraces(getTracesIter, yield, params.RawTraces) + if proceed && qs.options.ArchiveTraceReader != nil { + var missingTraceIDs []tracestore.GetTraceParams + for _, id := range params.TraceIDs { + if _, found := foundTraceIDs[id.TraceID]; !found { + missingTraceIDs = append(missingTraceIDs, id) + } + } + if len(missingTraceIDs) > 0 { + getArchiveTracesIter := qs.options.ArchiveTraceReader.GetTraces(ctx, missingTraceIDs...) + qs.receiveTraces(getArchiveTracesIter, yield, params.RawTraces) + } + } + } +} + +func (qs QueryService) GetServices(ctx context.Context) ([]string, error) { + return qs.traceReader.GetServices(ctx) +} + +func (qs QueryService) GetOperations( + ctx context.Context, + query tracestore.OperationQueryParams, +) ([]tracestore.Operation, error) { + return qs.traceReader.GetOperations(ctx, query) +} + +func (qs QueryService) FindTraces( + ctx context.Context, + query TraceQueryParams, +) iter.Seq2[[]ptrace.Traces, error] { + return func(yield func([]ptrace.Traces, error) bool) { + tracesIter := qs.traceReader.FindTraces(ctx, query.TraceQueryParams) + qs.receiveTraces(tracesIter, yield, query.RawTraces) + } +} + +// ArchiveTrace archives a trace specified by the given query parameters. +// If the ArchiveTraceWriter is not configured, it returns +// an error indicating that there is no archive span storage available. +func (qs QueryService) ArchiveTrace(ctx context.Context, query tracestore.GetTraceParams) error { + if qs.options.ArchiveTraceWriter == nil { + return errNoArchiveSpanStorage + } + getTracesIter := qs.GetTraces( + ctx, GetTraceParams{TraceIDs: []tracestore.GetTraceParams{query}}, + ) + var archiveErr error + getTracesIter(func(traces []ptrace.Traces, err error) bool { + if err != nil { + archiveErr = err + return false + } + for _, trace := range traces { + err = qs.options.ArchiveTraceWriter.WriteTraces(ctx, trace) + if err != nil { + archiveErr = errors.Join(archiveErr, err) + } + } + return true + }) + return archiveErr +} + +func (qs QueryService) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + return qs.dependencyReader.GetDependencies(ctx, depstore.QueryParameters{ + StartTime: endTs.Add(-lookback), + EndTime: endTs, + }) +} + +func (qs QueryService) GetCapabilities() StorageCapabilities { + return StorageCapabilities{ + ArchiveStorage: qs.options.hasArchiveStorage(), + } +} + +func (opts *QueryServiceOptions) hasArchiveStorage() bool { + return opts.ArchiveTraceReader != nil && opts.ArchiveTraceWriter != nil +} + +func (qs QueryService) receiveTraces( + seq iter.Seq2[[]ptrace.Traces, error], + yield func([]ptrace.Traces, error) bool, + rawTraces bool, +) (map[pcommon.TraceID]struct{}, bool) { + aggregatedTraces := jptrace.AggregateTraces(seq) + foundTraceIDs := make(map[pcommon.TraceID]struct{}) + proceed := true + aggregatedTraces(func(trace ptrace.Traces, err error) bool { + if err != nil { + proceed = yield(nil, err) + return proceed + } + if !rawTraces { + qs.options.Adjuster.Adjust(trace) + } + jptrace.SpanIter(trace)(func(_ jptrace.SpanIterPos, span ptrace.Span) bool { + foundTraceIDs[span.TraceID()] = struct{}{} + return true + }) + proceed = yield([]ptrace.Traces{trace}, nil) + return proceed + }) + return foundTraceIDs, proceed +} diff --git a/cmd/query/app/querysvc/v2/querysvc/service_test.go b/cmd/query/app/querysvc/v2/querysvc/service_test.go new file mode 100644 index 00000000000..4794c2d4c99 --- /dev/null +++ b/cmd/query/app/querysvc/v2/querysvc/service_test.go @@ -0,0 +1,496 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package querysvc + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/iter" + "github.com/jaegertracing/jaeger/storage_v2/depstore" + depstoremocks "github.com/jaegertracing/jaeger/storage_v2/depstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks" +) + +const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond) + +var ( + defaultDependencyLookbackDuration = time.Hour * 24 + + testTraceID = pcommon.TraceID([16]byte{1}) +) + +type testQueryService struct { + queryService *QueryService + traceReader *tracestoremocks.Reader + depsReader *depstoremocks.Reader + + archiveTraceReader *tracestoremocks.Reader + archiveTraceWriter *tracestoremocks.Writer +} + +type testOption func(*testQueryService, *QueryServiceOptions) + +func withArchiveTraceReader() testOption { + return func(tqs *testQueryService, options *QueryServiceOptions) { + r := &tracestoremocks.Reader{} + tqs.archiveTraceReader = r + options.ArchiveTraceReader = r + } +} + +func withArchiveTraceWriter() testOption { + return func(tqs *testQueryService, options *QueryServiceOptions) { + r := &tracestoremocks.Writer{} + tqs.archiveTraceWriter = r + options.ArchiveTraceWriter = r + } +} + +func initializeTestService(opts ...testOption) *testQueryService { + traceReader := &tracestoremocks.Reader{} + dependencyStorage := &depstoremocks.Reader{} + + options := QueryServiceOptions{} + + tqs := testQueryService{ + traceReader: traceReader, + depsReader: dependencyStorage, + } + + for _, opt := range opts { + opt(&tqs, &options) + } + + tqs.queryService = NewQueryService(traceReader, dependencyStorage, options) + return &tqs +} + +func makeTestTrace() ptrace.Traces { + trace := ptrace.NewTraces() + resources := trace.ResourceSpans().AppendEmpty() + scopes := resources.ScopeSpans().AppendEmpty() + + spanA := scopes.Spans().AppendEmpty() + spanA.SetTraceID(testTraceID) + spanA.SetSpanID(pcommon.SpanID([8]byte{1})) + + spanB := scopes.Spans().AppendEmpty() + spanB.SetTraceID(testTraceID) + spanB.SetSpanID(pcommon.SpanID([8]byte{2})) + spanB.Attributes() + + return trace +} + +func TestGetTraces_ErrorInReader(t *testing.T) { + tqs := initializeTestService() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield(nil, assert.AnError) + })).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + { + TraceID: testTraceID, + }, + }, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + _, err := iter.FlattenWithErrors(getTracesIter) + require.ErrorIs(t, err, assert.AnError) +} + +func TestGetTraces_Success(t *testing.T) { + tqs := initializeTestService() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + })).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + {TraceID: testTraceID}, + }, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) + + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + +func TestGetTraces_WithRawTraces(t *testing.T) { + tests := []struct { + rawTraces bool + attributes pcommon.Map + expected pcommon.Map + }{ + { + // tags should not get sorted by SortCollections adjuster + rawTraces: true, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + }, + { + // tags should get sorted by SortCollections adjuster + rawTraces: false, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("a", "key") + m.PutStr("z", "key") + return m + }(), + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) { + trace := makeTestTrace() + test.attributes.CopyTo(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()) + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace}, nil) + }) + + tqs := initializeTestService() + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + { + TraceID: testTraceID, + }, + }, + RawTraces: test.rawTraces, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + + require.Len(t, gotTraces, 1) + gotAttributes := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() + require.Equal(t, test.expected, gotAttributes) + }) + } +} + +func TestGetTraces_TraceInArchiveStorage(t *testing.T) { + tqs := initializeTestService(withArchiveTraceReader()) + + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{}, nil) + })).Once() + + tqs.archiveTraceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + })).Once() + + params := GetTraceParams{ + TraceIDs: []tracestore.GetTraceParams{ + {TraceID: testTraceID}, + }, + } + getTracesIter := tqs.queryService.GetTraces(context.Background(), params) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) + + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + +func TestGetServices(t *testing.T) { + tqs := initializeTestService() + expected := []string{"trifle", "bling"} + tqs.traceReader.On("GetServices", mock.Anything).Return(expected, nil).Once() + + actualServices, err := tqs.queryService.GetServices(context.Background()) + require.NoError(t, err) + assert.Equal(t, expected, actualServices) +} + +func TestGetOperations(t *testing.T) { + tqs := initializeTestService() + expected := []tracestore.Operation{{Name: "", SpanKind: ""}, {Name: "get", SpanKind: ""}} + operationQuery := tracestore.OperationQueryParams{ServiceName: "abc/trifle"} + tqs.traceReader.On( + "GetOperations", + mock.Anything, + operationQuery, + ).Return(expected, nil).Once() + + actualOperations, err := tqs.queryService.GetOperations(context.Background(), operationQuery) + require.NoError(t, err) + assert.Equal(t, expected, actualOperations) +} + +func TestFindTraces_Success(t *testing.T) { + tqs := initializeTestService() + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + + duration := 20 * time.Millisecond + now := time.Now() + queryParams := tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + } + tqs.traceReader.On("FindTraces", mock.Anything, queryParams).Return(responseIter).Once() + + query := TraceQueryParams{TraceQueryParams: queryParams} + getTracesIter := tqs.queryService.FindTraces(context.Background(), query) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + require.Len(t, gotTraces, 1) + + gotSpans := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans() + require.Equal(t, 2, gotSpans.Len()) + require.Equal(t, testTraceID, gotSpans.At(0).TraceID()) + require.EqualValues(t, [8]byte{1}, gotSpans.At(0).SpanID()) + require.Equal(t, testTraceID, gotSpans.At(1).TraceID()) + require.EqualValues(t, [8]byte{2}, gotSpans.At(1).SpanID()) +} + +func TestFindTraces_WithRawTraces(t *testing.T) { + tests := []struct { + rawTraces bool + attributes pcommon.Map + expected pcommon.Map + }{ + { + // tags should not get sorted by SortTagsAndLogFields adjuster + rawTraces: true, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + }, + { + // tags should get sorted by SortTagsAndLogFields adjuster + rawTraces: false, + attributes: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("z", "key") + m.PutStr("a", "key") + return m + }(), + expected: func() pcommon.Map { + m := pcommon.NewMap() + m.PutStr("a", "key") + m.PutStr("z", "key") + return m + }(), + }, + } + for _, test := range tests { + t.Run(fmt.Sprintf("rawTraces=%v", test.rawTraces), func(t *testing.T) { + trace := makeTestTrace() + test.attributes.CopyTo(trace.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes()) + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{trace}, nil) + }) + + tqs := initializeTestService() + duration, err := time.ParseDuration("20ms") + require.NoError(t, err) + now := time.Now() + tqs.traceReader.On("FindTraces", mock.Anything, tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }). + Return(responseIter).Once() + + query := TraceQueryParams{ + TraceQueryParams: tracestore.TraceQueryParams{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: now, + DurationMin: duration, + NumTraces: 200, + }, + RawTraces: test.rawTraces, + } + getTracesIter := tqs.queryService.FindTraces(context.Background(), query) + gotTraces, err := iter.FlattenWithErrors(getTracesIter) + require.NoError(t, err) + + require.Len(t, gotTraces, 1) + gotAttributes := gotTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() + require.Equal(t, test.expected, gotAttributes) + }) + } +} + +func TestArchiveTrace(t *testing.T) { + tests := []struct { + name string + options []testOption + setupMocks func(tqs *testQueryService) + expectedError error + }{ + { + name: "no options", + options: nil, + setupMocks: func(*testQueryService) {}, + expectedError: errNoArchiveSpanStorage, + }, + { + name: "get trace error", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{}, assert.AnError) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + }, + expectedError: assert.AnError, + }, + { + name: "archive writer error", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). + Return(assert.AnError).Once() + }, + expectedError: assert.AnError, + }, + { + name: "success", + options: []testOption{withArchiveTraceWriter()}, + setupMocks: func(tqs *testQueryService) { + responseIter := iter.Seq2[[]ptrace.Traces, error](func(yield func([]ptrace.Traces, error) bool) { + yield([]ptrace.Traces{makeTestTrace()}, nil) + }) + tqs.traceReader.On("GetTraces", mock.Anything, tracestore.GetTraceParams{TraceID: testTraceID}). + Return(responseIter).Once() + tqs.archiveTraceWriter.On("WriteTraces", mock.Anything, mock.AnythingOfType("ptrace.Traces")). + Return(nil).Once() + }, + expectedError: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tqs := initializeTestService(test.options...) + test.setupMocks(tqs) + + query := tracestore.GetTraceParams{ + TraceID: testTraceID, + } + + err := tqs.queryService.ArchiveTrace(context.Background(), query) + if test.expectedError != nil { + require.ErrorIs(t, err, test.expectedError) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestGetDependencies(t *testing.T) { + tqs := initializeTestService() + expected := []model.DependencyLink{ + {Parent: "killer", Child: "queen", CallCount: 12}, + } + endTs := time.Unix(0, 1476374248550*millisToNanosMultiplier) + tqs.depsReader.On("GetDependencies", mock.Anything, depstore.QueryParameters{ + StartTime: endTs.Add(-defaultDependencyLookbackDuration), + EndTime: endTs, + }).Return(expected, nil).Once() + + actualDependencies, err := tqs.queryService.GetDependencies(context.Background(), endTs, defaultDependencyLookbackDuration) + require.NoError(t, err) + assert.Equal(t, expected, actualDependencies) +} + +func TestGetCapabilities(t *testing.T) { + tests := []struct { + name string + options []testOption + expected StorageCapabilities + }{ + { + name: "without archive storage", + expected: StorageCapabilities{ + ArchiveStorage: false, + }, + }, + { + name: "with archive storage", + options: []testOption{withArchiveTraceReader(), withArchiveTraceWriter()}, + expected: StorageCapabilities{ + ArchiveStorage: true, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + tqs := initializeTestService(test.options...) + assert.Equal(t, test.expected, tqs.queryService.GetCapabilities()) + }) + } +} diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 159e958c5bf..982eba3e9fc 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -270,7 +270,7 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { found := s.waitForCondition(t, func(t *testing.T) bool { var err error actual, err = s.TraceReader.GetOperations(context.Background(), - tracestore.OperationQueryParameters{ServiceName: "example-service-1"}) + tracestore.OperationQueryParams{ServiceName: "example-service-1"}) if err != nil { t.Log(err) return false diff --git a/storage_v2/tracestore/mocks/Reader.go b/storage_v2/tracestore/mocks/Reader.go index 5f87e43581b..fd0d0a7e708 100644 --- a/storage_v2/tracestore/mocks/Reader.go +++ b/storage_v2/tracestore/mocks/Reader.go @@ -66,7 +66,7 @@ func (_m *Reader) FindTraces(ctx context.Context, query tracestore.TraceQueryPar } // GetOperations provides a mock function with given fields: ctx, query -func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { +func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQueryParams) ([]tracestore.Operation, error) { ret := _m.Called(ctx, query) if len(ret) == 0 { @@ -75,10 +75,10 @@ func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQ var r0 []tracestore.Operation var r1 error - if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParameters) ([]tracestore.Operation, error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParams) ([]tracestore.Operation, error)); ok { return rf(ctx, query) } - if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParameters) []tracestore.Operation); ok { + if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParams) []tracestore.Operation); ok { r0 = rf(ctx, query) } else { if ret.Get(0) != nil { @@ -86,7 +86,7 @@ func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQ } } - if rf, ok := ret.Get(1).(func(context.Context, tracestore.OperationQueryParameters) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, tracestore.OperationQueryParams) error); ok { r1 = rf(ctx, query) } else { r1 = ret.Error(1) diff --git a/storage_v2/tracestore/reader.go b/storage_v2/tracestore/reader.go index 5eef0ed8aca..86160412c1f 100644 --- a/storage_v2/tracestore/reader.go +++ b/storage_v2/tracestore/reader.go @@ -36,7 +36,7 @@ type Reader interface { // GetOperations returns all operation names for a given service // known to the backend from spans within its retention period. - GetOperations(ctx context.Context, query OperationQueryParameters) ([]Operation, error) + GetOperations(ctx context.Context, query OperationQueryParams) ([]Operation, error) // FindTraces returns an iterator that retrieves traces matching query parameters. // The iterator is single-use: once consumed, it cannot be used again. @@ -101,8 +101,8 @@ func (t *TraceQueryParams) ToSpanStoreQueryParameters() *spanstore.TraceQueryPar } } -// OperationQueryParameters contains parameters of query operations, empty spanKind means get operations for all kinds of span. -type OperationQueryParameters struct { +// OperationQueryParams contains parameters of query operations, empty spanKind means get operations for all kinds of span. +type OperationQueryParams struct { ServiceName string SpanKind string } diff --git a/storage_v2/v1adapter/reader.go b/storage_v2/v1adapter/reader.go index 225e9267d89..7ef6921d5e9 100644 --- a/storage_v2/v1adapter/reader.go +++ b/storage_v2/v1adapter/reader.go @@ -73,7 +73,7 @@ func (tr *TraceReader) GetServices(ctx context.Context) ([]string, error) { func (tr *TraceReader) GetOperations( ctx context.Context, - query tracestore.OperationQueryParameters, + query tracestore.OperationQueryParams, ) ([]tracestore.Operation, error) { o, err := tr.spanReader.GetOperations(ctx, spanstore.OperationQueryParameters{ ServiceName: query.ServiceName, diff --git a/storage_v2/v1adapter/reader_test.go b/storage_v2/v1adapter/reader_test.go index c142ee99c13..e1952c0a825 100644 --- a/storage_v2/v1adapter/reader_test.go +++ b/storage_v2/v1adapter/reader_test.go @@ -240,7 +240,7 @@ func TestTraceReader_GetOperationsDelegatesResponse(t *testing.T) { } operations, err := traceReader.GetOperations( context.Background(), - tracestore.OperationQueryParameters{ + tracestore.OperationQueryParams{ ServiceName: "service-a", SpanKind: "server", })