diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 1285c399529..544939cfc7d 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -302,7 +302,7 @@ func (s *SpanReader) GetOperations( defer span.End() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) - operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) + operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query, s.maxDocCount) if err != nil { return nil, err } @@ -312,7 +312,8 @@ func (s *SpanReader) GetOperations( var result []spanstore.Operation for _, operation := range operations { result = append(result, spanstore.Operation{ - Name: operation, + Name: operation, + SpanKind: query.SpanKind, }) } return result, err diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index a6c320b3f1c..562f2a2615b 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "hash/fnv" + "strings" "time" "github.com/olivere/elastic" @@ -28,6 +29,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( @@ -106,8 +108,8 @@ func getServicesAggregation(maxDocCount int) elastic.Query { Size(maxDocCount) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } -func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service string, maxDocCount int) ([]string, error) { - serviceQuery := elastic.NewTermQuery(serviceName, service) +func (s *ServiceOperationStorage) getOperations(context context.Context, indices []string, service spanstore.OperationQueryParameters, maxDocCount int) ([]string, error) { + serviceQuery := elastic.NewTermQuery(serviceName, service.ServiceName) serviceFilter := getOperationsAggregation(maxDocCount) searchService := s.client().Search(indices...). @@ -127,7 +129,17 @@ func (s *ServiceOperationStorage) getOperations(context context.Context, indices if !found { return nil, errors.New("could not find aggregation of " + operationsAggregation) } - operationNamesBucket := bucket.Buckets + + var operationNamesBucket []*elastic.AggregationBucketKeyItem + if service.SpanKind != "" { + for _, bucket := range bucket.Buckets { + if strings.Contains(bucket.Key.(string), service.SpanKind) { + operationNamesBucket = append(operationNamesBucket, bucket) + } + } + } else { + operationNamesBucket = bucket.Buckets + } return bucketToStringArray(operationNamesBucket) }