Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add E2E Integration Test For Adaptive Sampling Processor #5951

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6cc7c1b
Setup Docker Compose With Jaeger All In One And Tracegen
mahadzaryab1 Sep 7, 2024
e4eb3b6
Use V2 Binary Instead of V1
mahadzaryab1 Sep 7, 2024
ea76c8e
Adjust Parameters For Integration Test
mahadzaryab1 Sep 7, 2024
5483545
Fix Makefile Cleanup
mahadzaryab1 Sep 7, 2024
8ff74a7
Expose Port 4318 In Jaeger
mahadzaryab1 Sep 7, 2024
6a9699a
Revert To Port 5000
mahadzaryab1 Sep 7, 2024
a65c190
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Sep 8, 2024
3018caf
Remove Leader Check In Adaptive Strategy Provider
mahadzaryab1 Sep 13, 2024
d7ba2ce
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Sep 13, 2024
c3b1ca4
Reduce Calculation Interval And Calculation Delay
mahadzaryab1 Sep 14, 2024
7561945
Remove Unused Method
mahadzaryab1 Sep 14, 2024
9aaaf75
Make Forwarding Port Explicit
mahadzaryab1 Sep 15, 2024
3dc923d
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Sep 15, 2024
7dd33d1
Hardcode Adaptive Sampling
mahadzaryab1 Sep 16, 2024
b2be33c
Add Expvar Extension
mahadzaryab1 Sep 16, 2024
b996270
Add Script For E2E Integration Test
mahadzaryab1 Sep 17, 2024
d661a0f
Add Github Action
mahadzaryab1 Sep 17, 2024
7aba57e
Fix Typo
mahadzaryab1 Sep 17, 2024
c9811ab
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Sep 17, 2024
735704c
Add Build Step To Script
mahadzaryab1 Sep 17, 2024
f485cc8
Add Missing Components To Workflow File
mahadzaryab1 Sep 17, 2024
54cd6b8
Add ExpVar Debugging For Post Aggregator Service Cache
mahadzaryab1 Oct 6, 2024
cf8dd9c
Merge branch 'main' into adaptive-sampling-e2e
mahadzaryab1 Oct 6, 2024
e18d8d5
Use New Configuration Schema
mahadzaryab1 Oct 6, 2024
24d11d4
Patch To Only Remove One Check
mahadzaryab1 Oct 6, 2024
9ebf133
Fix Linting
mahadzaryab1 Oct 6, 2024
ca9a8c9
Comment Out Failing Tests For Now
mahadzaryab1 Oct 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/workflows/ci-e2e-adaptivesampling-processor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
name: Test Adaptive Sampling Processor

on:
push:
branches: [main]

pull_request:
branches: [main]

concurrency:
group: ${{ github.workflow }}-${{ (github.event.pull_request && github.event.pull_request.number) || github.ref || github.run_id }}
cancel-in-progress: true

# See https://github.com/ossf/scorecard/blob/main/docs/checks.md#token-permissions
permissions:
contents: read

jobs:
adaptivesampling-processor:
runs-on: ubuntu-latest

steps:
- name: Harden Runner
uses: step-security/harden-runner@0d381219ddf674d61a7572ddd19d7941e271515c # v2.9.0
with:
egress-policy: audit # TODO: change to 'egress-policy: block' after couple of runs

- uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 # v4.1.7
with:
submodules: true

- name: Fetch git tags
run: |
git fetch --prune --unshallow --tags

- uses: actions/setup-go@0a12ed9d6a96ab950c8f026ed9f722fe0da7ef32 # v5.0.2
with:
go-version: 1.23.x

- name: Setup Node.js version
uses: ./.github/actions/setup-node.js
Comment on lines +40 to +41
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure you need this, unless the test specifically checks that the UI is able to render the metrics


- name: Run Adaptive Sampling Processor Test
run: bash scripts/adaptive-sampling-integration-test.sh
31 changes: 31 additions & 0 deletions docker-compose/adaptive-sampling/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) 2024 The Jaeger Authors.
# SPDX-License-Identifier: Apache-2.0

BINARY ?= jaeger

.PHONY: build
build: clean-jaeger
cd ../../ && make build-$(BINARY) GOOS=linux
cd ../../ && make create-baseimg PLATFORMS=linux/$(shell go env GOARCH)
cd ../../ && docker buildx build --target release \
--tag jaegertracing/$(BINARY):dev \
--build-arg base_image=localhost:5000/baseimg_alpine:latest \
--build-arg debug_image=not-used \
--build-arg TARGETARCH=$(shell go env GOARCH) \
--load \
cmd/$(BINARY)

.PHONY: dev
dev: export JAEGER_IMAGE_TAG = dev
dev: build
docker compose -f docker-compose.yml up $(DOCKER_COMPOSE_ARGS)

.PHONY: clean-jaeger
clean-jaeger:
# Also cleans up intermediate cached containers.
docker system prune -f

.PHONY: clean-all
clean-all: clean-jaeger
docker rmi -f jaegertracing/jaeger:dev ; \
docker rmi -f jaegertracing/jaeger:latest ;
19 changes: 19 additions & 0 deletions docker-compose/adaptive-sampling/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
services:
jaeger:
image: jaegertracing/jaeger:${JAEGER_IMAGE_TAG:-latest}
volumes:
- "./jaeger-v2-config.yml:/etc/jaeger/config.yml"
command: ["--config", "/etc/jaeger/config.yml"]
ports:
- "16686:16686"
- "5778:5778"
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
- "4318:4318"
- "27777:27777"

tracegen:
image: jaegertracing/jaeger-tracegen:latest
environment:
- OTEL_EXPORTER_OTLP_TRACES_ENDPOINT=http://jaeger:4318/v1/traces
command: ["-adaptive-sampling", "http://jaeger:5778/api/sampling", "-pause", "100ms", "-duration", "60m"]
depends_on:
- jaeger
46 changes: 46 additions & 0 deletions docker-compose/adaptive-sampling/jaeger-v2-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
service:
extensions: [jaeger_storage, jaeger_query, remote_sampling, healthcheckv2, expvar]
pipelines:
traces:
receivers: [otlp]
processors: [batch, adaptive_sampling]
exporters: [jaeger_storage_exporter]

extensions:
healthcheckv2:
use_v2: true
http:
jaeger_query:
storage:
traces: some_store
jaeger_storage:
backends:
some_store:
memory:
max_traces: 100000
remote_sampling:
adaptive:
sampling_store: some_store
initial_sampling_probability: 1.0
target_samples_per_second: 1
calculation_interval: 10s
calculation_delay: 20s
http:
grpc:
expvar:
port: 27777

receivers:
otlp:
protocols:
grpc:
http:
endpoint: 0.0.0.0:4318

processors:
batch:
adaptive_sampling:

exporters:
jaeger_storage_exporter:
trace_storage: some_store
8 changes: 8 additions & 0 deletions internal/safeexpvar/safeexpvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,11 @@
}
v.(*expvar.Int).Set(value)
}

func SetString(name string, value string) {
v := expvar.Get(name)
if v == nil {
v = expvar.NewString(name)

Check warning on line 31 in internal/safeexpvar/safeexpvar.go

View check run for this annotation

Codecov / codecov/patch

internal/safeexpvar/safeexpvar.go#L28-L31

Added lines #L28 - L31 were not covered by tests
}
v.(*expvar.String).Set(value)

Check warning on line 33 in internal/safeexpvar/safeexpvar.go

View check run for this annotation

Codecov / codecov/patch

internal/safeexpvar/safeexpvar.go#L33

Added line #L33 was not covered by tests
}
2 changes: 1 addition & 1 deletion plugin/sampling/strategyprovider/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (a *aggregator) HandleRootSpan(span *span_model.Span, logger *zap.Logger) {
}
samplerType, samplerParam := span.GetSamplerParams(logger)
if samplerType == span_model.SamplerTypeUnrecognized {
return
samplerType = span_model.SamplerTypeProbabilistic
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro what kind of a config do we want to add to perform this override?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like "do not check sampler tags"

Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro should this config be exposed as part of the YAML configuration? or do we just want it to be internal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be user settable

}
a.RecordThroughput(service, span.OperationName, samplerType, samplerParam)
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ func TestRecordThroughput(t *testing.T) {
// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = "GET"
a.HandleRootSpan(span, logger)
require.Empty(t, a.(*aggregator).currentThroughput)
// require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
a.HandleRootSpan(span, logger)
assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count)
// assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count)
}

func TestRecordThroughputFunc(t *testing.T) {
Expand Down Expand Up @@ -175,13 +175,13 @@ func TestRecordThroughputFunc(t *testing.T) {
// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = "GET"
a.HandleRootSpan(span, logger)
require.Empty(t, a.(*aggregator).currentThroughput)
// require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
a.HandleRootSpan(span, logger)
assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count)
// assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count)
}
15 changes: 15 additions & 0 deletions plugin/sampling/strategyprovider/adaptive/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,18 @@ func (s SamplingCache) Get(service, operation string) *SamplingCacheEntry {
}
return v[operation]
}

type SamplingCacheValue map[string]map[string]SamplingCacheEntry

func (s SamplingCache) ToValue() SamplingCacheValue {
scv := make(map[string]map[string]SamplingCacheEntry)
for k, v := range s {
scv[k] = make(map[string]SamplingCacheEntry)
for kk, vv := range v {
if vv != nil {
scv[k][kk] = *vv
}
}
}
return scv
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package adaptive

import (
"errors"
"fmt"
"math"
"math/rand"
"sync"
Expand All @@ -13,6 +14,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/model"
"github.com/jaegertracing/jaeger/internal/safeexpvar"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/sampling/leaderelection"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive/calculationstrategy"
Expand Down Expand Up @@ -346,6 +348,7 @@ func (p *PostAggregator) calculateProbability(service, operation string, qps flo
Probability: oldProbability,
UsingAdaptive: usingAdaptiveSampling,
})
safeexpvar.SetString("post_aggregator_service_cache[0]", fmt.Sprintf("%v", p.serviceCache[0].ToValue()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the tostring loses important information, we should use hierarchical expvar.Map


// Short circuit if the qps is close enough to targetQPS or if the service doesn't appear to be using
// adaptive sampling.
Expand Down Expand Up @@ -398,7 +401,7 @@ func (p *PostAggregator) isUsingAdaptiveSampling(
// before.
if len(p.serviceCache) > 1 {
if e := p.serviceCache[1].Get(service, operation); e != nil {
return e.UsingAdaptive && !FloatEquals(e.Probability, p.InitialSamplingProbability)
return !FloatEquals(e.Probability, p.InitialSamplingProbability)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro with this patch, the numbers seem to make a bit more sense. here's the output i see now

"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{1 true}]]"
"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{0.20840949054166666 false}]]"
"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{0.021478798306074933 false}]]"
"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{0.012634530863339348 false}]]"
"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{0.02105483853348014 false}]]"
….
"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{0.08421935413392057 false}]]"
"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{0.1403881259748575 false}]]"
"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{0.08254900503950167 false}]]"
"post_aggregator_service_cache[0]": "map[tracegen:map[lets-go:{0.051602341629876265 false}]]"

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know if you have any thoughts on how to proceed here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • post_aggregator_service_cache name is unclear, are these probabilities, or throughput?
  • the Boolean value at the end looks suspicious, what does it mean? If it's "using adaptive sampling" indicator then we need to know why it goes to false.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
return false
Expand Down
11 changes: 2 additions & 9 deletions plugin/sampling/strategyprovider/adaptive/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,14 @@ func (p *Provider) runUpdateProbabilitiesLoop() {
for {
select {
case <-ticker.C:
// Only load probabilities if this strategy_store doesn't hold the leader lock
if !p.isLeader() {
p.loadProbabilities()
p.generateStrategyResponses()
}
p.loadProbabilities()
p.generateStrategyResponses()
case <-p.shutdown:
return
}
}
}

func (p *Provider) isLeader() bool {
return p.electionParticipant.IsLeader()
}

// generateStrategyResponses generates and caches SamplingStrategyResponse from the calculated sampling probabilities.
func (p *Provider) generateStrategyResponses() {
p.RLock()
Expand Down
99 changes: 89 additions & 10 deletions scripts/adaptive-sampling-integration-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,95 @@

set -euf -o pipefail

# This script is currently a placeholder.
compose_file=docker-compose/adaptive-sampling/docker-compose.yml

# Commands to run integration test:
# SAMPLING_STORAGE_TYPE=memory SAMPLING_CONFIG_TYPE=adaptive go run -tags=ui ./cmd/all-in-one --log-level=debug
# go run ./cmd/tracegen -adaptive-sampling=http://localhost:14268/api/sampling -pause=10ms -duration=60m
set -x

# Check how strategy is changing
# curl 'http://localhost:14268/api/sampling?service=tracegen' | jq .
timeout=600
end_time=$((SECONDS + timeout))
success="false"

# Issues
# - SDK does not report sampling probability in the tags the way Jaeger SDKs did
# - Server probably does not recognize spans as having adaptive sampling without sampler info
# - There is no way to modify target traces-per-second dynamically, must restart collector.
threshold=0.5

dump_logs() {
echo "::group:: docker logs"
docker compose -f $compose_file logs
echo "::endgroup::"
}

teardown_services() {
if [[ "$success" == "false" ]]; then
dump_logs
fi
docker compose -f $compose_file down
}

check_service_health() {
local service_name=$1
local url=$2
echo "Checking health of service: $service_name at $url"

local wait_seconds=3
local curl_params=(
--silent
--output
/dev/null
--write-out
"%{http_code}"
)
while [ $SECONDS -lt $end_time ]; do
if [[ "$(curl "${curl_params[@]}" "${url}")" == "200" ]]; then
echo "✅ $service_name is healthy"
return 0
fi
echo "Waiting for $service_name to be healthy..."
sleep $wait_seconds
done

echo "❌ ERROR: $service_name did not become healthy in time"
return 1
}

wait_for_services() {
echo "Waiting for services to be up and running..."
check_service_health "Jaeger" "http://localhost:16686"
}

check_tracegen_probability() {
local url="http://localhost:5778/api/sampling?service=tracegen"
response=$(curl -s "$url")
probability=$(echo "$response" | jq .operationSampling | jq -r '.perOperationStrategies[] | select(.operation=="lets-go")' | jq .probabilisticSampling.samplingRate)
if [ -n "$probability" ]; then
if (( $(echo "$probability < $threshold" |bc -l) )); then
return 0
fi
fi
return 1
}

check_adaptive_sampling() {
local wait_seconds=10
while [ $SECONDS -lt $end_time ]; do
if check_tracegen_probability; then
success="true"
break
fi
sleep $wait_seconds
done
if [[ "$success" == "false" ]]; then
echo "❌ ERROR: Adaptive sampling probability did not drop below $threshold."
exit 1
else
echo "✅ Adaptive sampling probability integration test passed"
fi
}

main() {
(cd docker-compose/adaptive-sampling && make build && make dev DOCKER_COMPOSE_ARGS="-d")
wait_for_services
check_adaptive_sampling
}

trap teardown_services EXIT INT

main
Loading