Skip to content

Commit

Permalink
added telemetry support
Browse files Browse the repository at this point in the history
  • Loading branch information
Pawel Szczodruch committed Apr 7, 2021
1 parent e27f702 commit d642ab0
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 10 deletions.
2 changes: 1 addition & 1 deletion base_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (t *baseTransport) getHTTPClient() *http.Client {
return t.httpClient
}

return http.DefaultClient
return &http.Client{}
}

// post returns an error which indicates the type of error that occurred while attempting to
Expand Down
41 changes: 35 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"
"regexp"
"runtime"
"time"
)

// A Client can be used to interact with Rollbar via the configured Transport.
Expand All @@ -23,6 +24,7 @@ type Client struct {
// Transport used to send data to the Rollbar API. By default an asynchronous
// implementation of the Transport interface is used.
Transport Transport
Telemetry *Telemetry
configuration configuration
diagnostic diagnostic
}
Expand All @@ -40,6 +42,7 @@ func NewAsync(token, environment, codeVersion, serverHost, serverRoot string) *C
diagnostic := createDiagnostic()
return &Client{
Transport: transport,
Telemetry: NewTelemetry(),
configuration: configuration,
diagnostic: diagnostic,
}
Expand All @@ -52,11 +55,29 @@ func NewSync(token, environment, codeVersion, serverHost, serverRoot string) *Cl
diagnostic := createDiagnostic()
return &Client{
Transport: transport,
Telemetry: NewTelemetry(),
configuration: configuration,
diagnostic: diagnostic,
}
}

// CaptureTelemetryEvent sets the user-specified telemetry event
func (c *Client) CaptureTelemetryEvent(eventType, eventlevel string, eventData map[string]interface{}) {
data := map[string]interface{}{}
data["body"] = eventData
data["type"] = eventType
data["level"] = eventlevel
data["source"] = "client"
data["timestamp_ms"] = time.Now().UnixNano() / int64(time.Millisecond)

c.Telemetry.Queue.Push(data)
}

// SetTelemetry sets the telemetry
func (c *Client) SetTelemetry(t *Telemetry) {
c.Telemetry = t
}

// SetEnabled sets whether or not Rollbar is enabled.
// If this is true then this library works as normal.
// If this is false then no calls will be made to the network.
Expand Down Expand Up @@ -361,7 +382,8 @@ func (c *Client) ErrorWithStackSkipWithExtrasAndContext(ctx context.Context, lev
return
}
body := c.buildBody(ctx, level, err.Error(), extras)
addErrorToBody(c.configuration, body, err, skip)
telemetry := c.Telemetry.GetQueueItems()
addErrorToBody(c.configuration, body, err, skip, telemetry)
c.push(body)
}

Expand Down Expand Up @@ -389,7 +411,8 @@ func (c *Client) RequestErrorWithStackSkipWithExtrasAndContext(ctx context.Conte
return
}
body := c.buildBody(ctx, level, err.Error(), extras)
data := addErrorToBody(c.configuration, body, err, skip)
telemetry := c.Telemetry.GetQueueItems()
data := addErrorToBody(c.configuration, body, err, skip, telemetry)
data["request"] = c.requestDetails(r)
c.push(body)
}
Expand All @@ -415,7 +438,10 @@ func (c *Client) MessageWithExtrasAndContext(ctx context.Context, level string,
}
body := c.buildBody(ctx, level, msg, extras)
data := body["data"].(map[string]interface{})
data["body"] = messageBody(msg)
dataBody := messageBody(msg)
telemetry := c.Telemetry.GetQueueItems()
dataBody["telemetry"] = telemetry
data["body"] = dataBody
c.push(body)
}

Expand All @@ -440,7 +466,10 @@ func (c *Client) RequestMessageWithExtrasAndContext(ctx context.Context, level s
}
body := c.buildBody(ctx, level, msg, extras)
data := body["data"].(map[string]interface{})
data["body"] = messageBody(msg)
dataBody := messageBody(msg)
telemetry := c.Telemetry.GetQueueItems()
dataBody["telemetry"] = telemetry
data["body"] = dataBody
data["request"] = c.requestDetails(r)
c.push(body)
}
Expand Down Expand Up @@ -669,12 +698,12 @@ func createConfiguration(token, environment, codeVersion, serverHost, serverRoot
}

type diagnostic struct {
languageVersion string
languageVersion string
}

func createDiagnostic() diagnostic {
return diagnostic{
languageVersion: runtime.Version(),
languageVersion: runtime.Version(),
}
}

Expand Down
59 changes: 59 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package rollbar

import "sync"

// NewQueue returns a new queue with the given initial size.
func NewQueue(size int) *Queue {
return &Queue{
nodes: make([]interface{}, size),
size: size,
}
}

// Queue is a basic FIFO queue based on a circular list that resizes as needed.
type Queue struct {
nodes []interface{}
size int
head int
tail int
count int

lock sync.RWMutex
}

// Push adds a node to the queue.
func (q *Queue) Push(n interface{}) {
q.lock.Lock()
defer q.lock.Unlock()
if q.head == q.tail && q.count > 0 {
nodes := make([]interface{}, len(q.nodes)+q.size)
copy(nodes, q.nodes[q.head:])
copy(nodes[len(q.nodes)-q.head:], q.nodes[:q.head])
q.head = 0
q.tail = len(q.nodes)
q.nodes = nodes
}
q.nodes[q.tail] = n
q.tail = (q.tail + 1) % len(q.nodes)
q.count++
}

// Pop removes and returns a node from the queue in first to last order.
func (q *Queue) Pop() interface{} {
q.lock.Lock()
defer q.lock.Unlock()
if q.count == 0 {
return nil
}
node := q.nodes[q.head]
q.head = (q.head + 1) % len(q.nodes)
q.count--
return node
}

// Items returns all populated (non nil) items
func (q *Queue) Items() []interface{} {
q.lock.RLock()
defer q.lock.RUnlock()
return q.nodes[:q.count]
}
16 changes: 16 additions & 0 deletions rollbar.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ var DefaultStackTracer StackTracerFunc = func(err error) ([]runtime.Frame, bool)
return nil, false
}

// GetTelemetryHTTPClientTransport enables a user to set Transport on http client:
// example: client := &http.Client{Transport: rollbar.GetTelemetryHTTPClientTransport()}
func GetTelemetryHTTPClientTransport() http.RoundTripper {
return std.Telemetry
}

// SetTelemetry sets the telemetry
func SetTelemetry(t *Telemetry) {
std.SetTelemetry(t)
}

// CaptureTelemetryEvent sets the user-specified telemetry event
func CaptureTelemetryEvent(eventType, eventlevel string, eventData map[string]interface{}) {
std.CaptureTelemetryEvent(eventType, eventlevel, eventData)
}

// SetEnabled sets whether or not the managed Client instance is enabled.
// If this is true then this library works as normal.
// If this is false then no calls will be made to the network.
Expand Down
116 changes: 116 additions & 0 deletions telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package rollbar

import (
"fmt"
"io"
"log"
"net/http"
"os"
"time"
)

// Telemetry struct contains writer (for logs) and round tripper (for http client) and enables to queue the events
type Telemetry struct {
Writer io.Writer
Proxied http.RoundTripper
Queue *Queue
}

// Write is the writer for telemetry logs
func (t *Telemetry) Write(p []byte) (int, error) {
telemetryData := t.populateLoggerBody(p)
t.Queue.Push(telemetryData)
return t.Writer.Write(p)
}

// RoundTrip implements RoundTrip in http.RoundTripper
func (t *Telemetry) RoundTrip(req *http.Request) (res *http.Response, e error) {

// Send the request, get the response (or the error)
res, e = t.Proxied.RoundTrip(req)
if e != nil {
fmt.Printf("Error: %v", e)
}
telemetryData := t.populateTransporterBody(req, res)
t.Queue.Push(telemetryData)
return
}

func (t *Telemetry) populateLoggerBody(p []byte) map[string]interface{} {
var data = map[string]interface{}{}
message := map[string]interface{}{"message": string(p)}
data["body"] = message
data["source"] = "client"
data["timestamp_ms"] = time.Now().UnixNano() / int64(time.Millisecond)
data["type"] = "log"
data["level"] = "log"
return data
}

func (t *Telemetry) populateTransporterBody(req *http.Request, res *http.Response) map[string]interface{} {
var data = map[string]interface{}{}
var dataBody = map[string]interface{}{}
var dataHeaders = map[string][]string{}
dataBody["status_code"] = nil
data["level"] = "info"
if res != nil {
dataBody["status_code"] = res.StatusCode
if res.StatusCode >= http.StatusInternalServerError {
data["level"] = "critical"
} else if res.StatusCode >= http.StatusBadRequest {
data["level"] = "error"
}
}
dataBody["url"] = req.URL.Scheme + "://" + req.Host + req.URL.Path
dataBody["method"] = req.Method
dataBody["subtype"] = "http"

for k, v := range req.Header {
dataHeaders[k] = v
}
dataBody["request_headers"] = dataHeaders

data["body"] = dataBody
data["source"] = "client"
data["timestamp_ms"] = time.Now().UnixNano() / int64(time.Millisecond)
data["type"] = "network"
return data
}

// GetQueueItems gets all the items from the queue
func (t *Telemetry) GetQueueItems() []interface{} {
return t.Queue.Items()
}

// OptionFunc is the pointer to the optional parameter function
type OptionFunc func(*Telemetry)

// WithCustomTransporter sets the custom transporter
func WithCustomTransporter(t http.RoundTripper) OptionFunc {
return func(f *Telemetry) {
f.Proxied = t
}
}

// WithCustomQueueSize initializes the queue with a custom size
func WithCustomQueueSize(size int) OptionFunc {
return func(f *Telemetry) {
f.Queue = NewQueue(size)
}
}

// NewTelemetry initializes telemetry object
func NewTelemetry(options ...OptionFunc) *Telemetry {
res := &Telemetry{
Proxied: http.DefaultTransport,
Queue: NewQueue(50),
Writer: os.Stdout,
}
for _, opt := range options {
opt(res)
}

log.SetOutput(res)
http.DefaultClient = &http.Client{Transport: res}
return res
}
8 changes: 5 additions & 3 deletions transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func buildBody(ctx context.Context, configuration configuration, diagnostic diag
"name": NAME,
"version": VERSION,
"diagnostic": map[string]interface{}{
"languageVersion": diagnostic.languageVersion,
"languageVersion": diagnostic.languageVersion,
"configuredOptions": buildConfiguredOptions(configuration),
},
},
Expand Down Expand Up @@ -101,10 +101,12 @@ func buildConfiguredOptions(configuration configuration) map[string]interface{}
}
}

func addErrorToBody(configuration configuration, body map[string]interface{}, err error, skip int) map[string]interface{} {
func addErrorToBody(configuration configuration, body map[string]interface{}, err error, skip int, telemetry []interface{}) map[string]interface{} {
data := body["data"].(map[string]interface{})
errBody, fingerprint := errorBody(configuration, err, skip)
data["body"] = errBody
dataBody := errBody
dataBody["telemetry"] = telemetry
data["body"] = dataBody
if configuration.fingerprint {
data["fingerprint"] = fingerprint
}
Expand Down

0 comments on commit d642ab0

Please sign in to comment.