Skip to content

Commit

Permalink
Merge pull request #10 from ExpediaDotCom/http-collector-dispatcher
Browse files Browse the repository at this point in the history
adding single(sharable) and dual-span(non-sharable) support, default …
  • Loading branch information
Jeff Baker authored Oct 30, 2018
2 parents b4ba6c6 + f1ada50 commit 89b22a7
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 61 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ addons:
hosts:
- kafkasvc
- haystack_agent
- haystack_collector

before_script:
- curl -OL https://github.com/google/protobuf/releases/download/v3.3.0/protoc-3.3.0-linux-x86_64.zip
Expand Down
9 changes: 9 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ services:
- /bin/sh
- -c
- 'sleep 10 && java -jar /app/bin/haystack-agent.jar --config-provider file --file-path /app/bin/dev.conf'
haystack_collector:
image: expediadotcom/haystack-http-span-collector:1.1
depends_on:
- zookeeper
- kafkasvc
environment:
HAYSTACK_PROP_KAFKA_PRODUCER_PROPS_BOOTSTRAP_SERVERS: "kafkasvc:9092"
ports:
- "8080:8080"
zookeeper:
image: wurstmeister/zookeeper
ports:
Expand Down
92 changes: 59 additions & 33 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package haystack

import (
"fmt"
"log"
"strconv"
"testing"
"time"
Expand All @@ -30,15 +29,42 @@ import (
"github.com/opentracing/opentracing-go/ext"
)

func TestIntegrationWithHaystackAgent(t *testing.T) {
tracer, closer := NewTracer("dummy-service", NewAgentDispatcher("haystack_agent", 35000, 3*time.Second, 1000), TracerOptionsFactory.Tag("appVer", "v1.1"))
defer func() {
err := closer.Close()
if err != nil {
panic(err)
}
}()
type consoleLogger struct{}

/*Error prints the error message*/
func (logger *consoleLogger) Error(format string, v ...interface{}) {
fmt.Printf(format, v...)
fmt.Print("\n")
}

/*Info prints the info message*/
func (logger *consoleLogger) Info(format string, v ...interface{}) {
fmt.Printf(format, v...)
fmt.Print("\n")
}

/*Debug prints the info message*/
func (logger *consoleLogger) Debug(format string, v ...interface{}) {
fmt.Printf(format, v...)
fmt.Print("\n")
}

func createKafkaConsumer() sarama.PartitionConsumer {
consumer, err := sarama.NewConsumer([]string{"kafkasvc:9092"}, nil)

if err != nil {
panic(err)
}

partitionConsumer, err := consumer.ConsumePartition("proto-spans", 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}

return partitionConsumer
}

func executeTest(tracer opentracing.Tracer, partitionConsumer sarama.PartitionConsumer, t *testing.T) {
serverTime := time.Now()
serverSpan := tracer.StartSpan("serverOp", opentracing.Tag{Key: "server-tag-key", Value: "something"}, opentracing.StartTime(serverTime))
ext.SpanKind.Set(serverSpan, ext.SpanKindRPCServerEnum)
Expand All @@ -57,29 +83,6 @@ func TestIntegrationWithHaystackAgent(t *testing.T) {
clientSpan.Finish()
serverSpan.Finish()

consumer, err := sarama.NewConsumer([]string{"kafkasvc:9092"}, nil)

if err != nil {
panic(err)
}

defer func() {
if err := consumer.Close(); err != nil {
log.Fatalln(err)
}
}()

partitionConsumer, err := consumer.ConsumePartition("proto-spans", 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}

defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()

clientSpanReceived := 0
serverSpanReceived := 0
clientParentSpanID := ""
Expand All @@ -95,7 +98,7 @@ ConsumerLoop:
span := &Span{}
unmarshalErr := span.XXX_Unmarshal(msg.Value)
if unmarshalErr != nil {
panic(err)
panic(unmarshalErr)
}

verifyCommonAttr(t, span)
Expand Down Expand Up @@ -136,6 +139,29 @@ ConsumerLoop:
}
}

func TestIntegration(t *testing.T) {
consumer := createKafkaConsumer()
agentTracer, agentCloser := NewTracer("dummy-service", NewAgentDispatcher("haystack_agent", 35000, 3*time.Second, 1000), TracerOptionsFactory.Tag("appVer", "v1.1"), TracerOptionsFactory.Logger(&consoleLogger{}))
defer func() {
err := agentCloser.Close()
if err != nil {
panic(err)
}
}()

executeTest(agentTracer, consumer, t)

httpDispatcher := NewHTTPDispatcher("http://haystack_collector:8080/span", 3*time.Second, make(map[string]string), 1000)
_, httpCloser := NewTracer("dummy-service", httpDispatcher, TracerOptionsFactory.Tag("appVer", "v1.1"), TracerOptionsFactory.Logger(&consoleLogger{}))
defer func() {
err := httpCloser.Close()
if err != nil {
panic(err)
}
}()
//executeTest(httpTracer, consumer, t)
}

func verifyCommonAttr(t *testing.T, span *Span) {
assert.Equal(t, span.GetServiceName(), "dummy-service")
assert.Equal(t, tagVal(span, "appVer"), "v1.1")
Expand Down
9 changes: 5 additions & 4 deletions propagator.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,11 @@ func (p *TextMapPropagator) Extract(carrier interface{}) (*SpanContext, error) {
}
}
return &SpanContext{
TraceID: carrierMap[p.opts.TraceIDKEY()],
SpanID: carrierMap[p.opts.SpanIDKEY()],
ParentID: carrierMap[p.opts.ParentSpanIDKEY()],
Baggage: baggage,
TraceID: carrierMap[p.opts.TraceIDKEY()],
SpanID: carrierMap[p.opts.SpanIDKEY()],
ParentID: carrierMap[p.opts.ParentSpanIDKEY()],
Baggage: baggage,
IsExtractedContext: true,
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions span_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type SpanContext struct {

//Context baggage. The is a snapshot in time.
Baggage map[string]string

// set to true if extracted using a extractor in tracer
IsExtractedContext bool
}

// IsValid indicates whether this context actually represents a valid trace.
Expand Down
60 changes: 44 additions & 16 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ import (

"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

/*Tracer implements the opentracing.tracer*/
type Tracer struct {
serviceName string
logger Logger
dispatcher Dispatcher
commonTags []opentracing.Tag
timeNow func() time.Time
idGenerator func() string
propagtors map[interface{}]Propagator
serviceName string
logger Logger
dispatcher Dispatcher
commonTags []opentracing.Tag
timeNow func() time.Time
idGenerator func() string
propagtors map[interface{}]Propagator
useDualSpanMode bool
}

/*NewTracer creates a new tracer*/
Expand All @@ -43,13 +45,13 @@ func NewTracer(
options ...TracerOption,
) (opentracing.Tracer, io.Closer) {
tracer := &Tracer{
serviceName: serviceName,
dispatcher: dispatcher,
serviceName: serviceName,
dispatcher: dispatcher,
useDualSpanMode: false,
}
tracer.propagtors = make(map[interface{}]Propagator)
tracer.propagtors[opentracing.TextMap] = NewDefaultTextMapPropagator()
tracer.propagtors[opentracing.HTTPHeaders] = NewTextMapPropagator(PropagatorOpts{}, URLCodex{})

for _, option := range options {
option(tracer)
}
Expand Down Expand Up @@ -107,7 +109,7 @@ func (tracer *Tracer) StartSpan(
}
}

spanContext := tracer.createSpanContext(parent)
spanContext := tracer.createSpanContext(parent, tracer.isServerSpan(sso.Tags))

span := &_Span{
tracer: tracer,
Expand All @@ -127,18 +129,44 @@ func (tracer *Tracer) StartSpan(
return span
}

func (tracer *Tracer) createSpanContext(parent *SpanContext) *SpanContext {
func (tracer *Tracer) isServerSpan(spanTags map[string]interface{}) bool {
if spanKind, ok := spanTags[string(ext.SpanKind)]; ok && spanKind == "server" {
return true
}
return false
}

func (tracer *Tracer) createSpanContext(parent *SpanContext, isServerSpan bool) *SpanContext {
if parent == nil || !parent.IsValid() {
return &SpanContext{
TraceID: tracer.idGenerator(),
SpanID: tracer.idGenerator(),
}
}

// This is a check to see if the tracer is configured to support single
// single span type (Zipkin style shared span id) or
// dual span type (client and server having their own span ids ).
// a. If tracer is not of dualSpanType and if it is a server span then we
// just return the parent context with the same shared span ids
// b. If tracer is not of dualSpanType and if the parent context is an extracted one from the wire
// then we assume this is the first span in the server and so just return the parent context
// with the same shared span ids
if !tracer.useDualSpanMode && (isServerSpan || parent.IsExtractedContext) {
return &SpanContext{
TraceID: parent.TraceID,
SpanID: parent.SpanID,
ParentID: parent.ParentID,
Baggage: parent.Baggage,
IsExtractedContext: false,
}
}
return &SpanContext{
TraceID: parent.TraceID,
SpanID: tracer.idGenerator(),
ParentID: parent.SpanID,
Baggage: parent.Baggage,
TraceID: parent.TraceID,
SpanID: tracer.idGenerator(),
ParentID: parent.SpanID,
Baggage: parent.Baggage,
IsExtractedContext: false,
}
}

Expand Down
9 changes: 8 additions & 1 deletion tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,16 @@ func (t TracerOptions) Tag(key string, value interface{}) TracerOption {
}
}

/*Logger adds a common tag in every span*/
/*Logger set the logger type*/
func (t TracerOptions) Logger(logger Logger) TracerOption {
return func(tracer *Tracer) {
tracer.logger = logger
}
}

/*UseDualSpanMode sets the tracer in dual span mode*/
func (t TracerOptions) UseDualSpanMode() TracerOption {
return func(tracer *Tracer) {
tracer.useDualSpanMode = true
}
}
Loading

0 comments on commit 89b22a7

Please sign in to comment.