Skip to content

Commit

Permalink
Merge pull request #268 from yarpc/dev
Browse files Browse the repository at this point in the history
Release v0.15.0
  • Loading branch information
prashantv authored Jul 8, 2019
2 parents a2a3bf8 + f398080 commit c8bb688
Show file tree
Hide file tree
Showing 28 changed files with 636 additions and 159 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ sudo: false
language: go

go:
- "1.9"
- "1.10"
- "1.11.x"
- "1.12.x"

env:
global:
Expand All @@ -29,5 +29,5 @@ deploy:
script: GITHUB_REPO=$TRAVIS_REPO_SLUG scripts/release.sh $TRAVIS_TAG
skip_cleanup: true
on:
go: 1.9
go: 1.12.x
tags: true
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

# 0.15.0 (2019-07-07)
* Allow `null` in request templates to imply skipping the field.
* Allow specifying HTTP method for HTTP requests using `--http-method`.
* Add support for URLs in peer lists.
* Add support for per-peer stats from benchmarks using `--per-peer-stats`.
* Fix bug where benchmark stats were missing procedure if `--health` was used.

# 0.14.3 (2019-04-16)
* Fix bug where values specified in templates as `"y"` were being
converted to true.
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ yab -t ~/keyvalue.thrift -p localhost:12345 keyvalue KeyValue::get -r '{"key": "
```

[releases]: https://github.com/yarpc/yab/releases
[ci-img]: https://travis-ci.org/yarpc/yab.svg?branch=master
[ci]: https://travis-ci.org/yarpc/yab
[ci-img]: https://travis-ci.com/yarpc/yab.svg?branch=master
[ci]: https://travis-ci.com/yarpc/yab
[cov-img]: https://codecov.io/gh/yarpc/yab/branch/master/graph/badge.svg
[cov]: https://codecov.io/gh/yarpc/yab
28 changes: 21 additions & 7 deletions bench_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"github.com/opentracing/opentracing-go"
)

type peerTransport struct {
transport.Transport
peerID int
}

type benchmarkMethod struct {
serializer encoding.Serializer
req *transport.Request
Expand Down Expand Up @@ -65,34 +70,43 @@ func (m benchmarkMethod) call(t transport.Transport) (time.Duration, error) {
return duration, err
}

func peerBalancer(peers []string) func(i int) string {
func (m benchmarkMethod) Method() string {
return m.req.Method
}

func peerBalancer(peers []string) func(i int) (string, int) {
numPeers := len(peers)
startOffset := rand.Intn(numPeers)
return func(i int) string {
return func(i int) (string, int) {
offset := (startOffset + i) % numPeers
return peers[offset]
return peers[offset], offset
}
}

// WarmTransports returns n transports that have been warmed up.
// No requests may fail during the warmup period.
func (m benchmarkMethod) WarmTransports(n int, tOpts TransportOptions, warmupRequests int) ([]transport.Transport, error) {
func (m benchmarkMethod) WarmTransports(n int, tOpts TransportOptions, warmupRequests int) ([]peerTransport, error) {
tOpts, err := loadTransportPeers(tOpts)
if err != nil {
return nil, err
}

peerFor := peerBalancer(tOpts.Peers)
transports := make([]transport.Transport, n)
transports := make([]peerTransport, n)
errs := make([]error, n)

var wg sync.WaitGroup
for i := range transports {
wg.Add(1)
go func(i int, tOpts TransportOptions) {
defer wg.Done()
tOpts.Peers = []string{peerFor(i)}
transports[i], errs[i] = m.WarmTransport(tOpts, warmupRequests)

peerHostPort, peerIndex := peerFor(i)
tOpts.Peers = []string{peerHostPort}

tp, err := m.WarmTransport(tOpts, warmupRequests)
transports[i] = peerTransport{tp, peerIndex}
errs[i] = err
}(i, tOpts)
}

Expand Down
18 changes: 15 additions & 3 deletions bench_method_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import (
)

func benchmarkMethodForTest(t *testing.T, procedure string, p transport.Protocol) benchmarkMethod {
rOpts := RequestOptions{
return benchmarkMethodForROpts(t, RequestOptions{
Encoding: encoding.Thrift,
ThriftFile: validThrift,
Procedure: procedure,
}
}, p)
}

func benchmarkMethodForROpts(t *testing.T, rOpts RequestOptions, p transport.Protocol) benchmarkMethod {
serializer, err := NewSerializer(Options{ROpts: rOpts})
require.NoError(t, err, "Failed to create Thrift serializer")

Expand Down Expand Up @@ -197,8 +200,9 @@ func TestPeerBalancer(t *testing.T) {
rand.Seed(tt.seed)
peerFor := peerBalancer(tt.peers)
for i, want := range tt.want {
got := peerFor(i)
got, index := peerFor(i)
assert.Equal(t, want, got, "peerBalancer(%v) seed %v i %v failed", tt.peers, tt.seed, i)
assert.Equal(t, want, tt.peers[index], "peerBalancer(%v) seed %v i %v unexpected index %v", tt.peers, tt.seed, i, index)
}
}
}
Expand Down Expand Up @@ -292,3 +296,11 @@ func TestBenchmarkMethodWarmTransportsError(t *testing.T) {
}
}
}

func TestBenchmarkMethodHealth(t *testing.T) {
m := benchmarkMethodForROpts(t, RequestOptions{
Encoding: encoding.Thrift,
Health: true,
}, transport.TChannel)
assert.Equal(t, "Meta::health", m.Method())
}
23 changes: 20 additions & 3 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package main

import (
"errors"
"fmt"
"os"
"os/signal"
"runtime"
Expand Down Expand Up @@ -126,15 +127,30 @@ func runBenchmark(out output, logger *zap.Logger, allOpts Options, m benchmarkMe
out.Fatalf("Failed to warmup connections for benchmark: %v", err)
}

statter, err := statsd.NewClient(logger, opts.StatsdHostPort, allOpts.TOpts.ServiceName, allOpts.ROpts.Procedure)
globalStatter, err := statsd.NewClient(logger, opts.StatsdHostPort, allOpts.TOpts.ServiceName, m.Method())
if err != nil {
out.Fatalf("Failed to create statsd client for benchmark: %v", err)
}

var wg sync.WaitGroup
states := make([]*benchmarkState, len(connections)*opts.Concurrency)
for i := range states {
states[i] = newBenchmarkState(statter)

for i, c := range connections {
statter := globalStatter

if opts.PerPeerStats {
// If per-peer stats are enabled, dual emit metrics to the original value
// and the per-peer value.
prefix := fmt.Sprintf("peer.%v.", c.peerID)
statter = statsd.MultiClient(
statter,
statsd.NewPrefixedClient(statter, prefix),
)
}

for j := 0; j < opts.Concurrency; j++ {
states[i*opts.Concurrency+j] = newBenchmarkState(statter)
}
}

run := limiter.New(opts.MaxRequests, opts.RPS, opts.MaxDuration)
Expand All @@ -155,6 +171,7 @@ func runBenchmark(out output, logger *zap.Logger, allOpts Options, m benchmarkMe
}

// TODO: Support streaming updates.
// TOOD: Aggregate per-backend state for JSON output in future.

// Wait for all the worker goroutines to end.
wg.Wait()
Expand Down
69 changes: 69 additions & 0 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ package main

import (
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/yarpc/yab/statsd/statsdtest"
"github.com/yarpc/yab/transport"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -149,3 +151,70 @@ func TestRunBenchmarkErrors(t *testing.T) {
assert.Contains(t, fatalMessage, tt.wantErr, "Missing error for %+v", tt.opts)
}
}

func TestBenchmarkStatsPerPeer(t *testing.T) {
origUserEnv := os.Getenv("USER")
defer os.Setenv("USER", origUserEnv)
os.Setenv("USER", "tester")

var r1, r2 atomic.Int32

statsServer := statsdtest.NewServer(t)
defer statsServer.Close()

s1 := newServer(t)
defer s1.shutdown()
s1.register(fooMethod, methods.errorIf(func() bool {
r1.Inc()
return false
}))

s2 := newServer(t)
defer s2.shutdown()
s2.register(fooMethod, methods.errorIf(func() bool {
r2.Inc()
return false
}))

m := benchmarkMethodForTest(t, fooMethod, transport.TChannel)

tOpts := s1.transportOpts()
tOpts.Peers = append(tOpts.Peers, s2.transportOpts().Peers...)

const totalCalls = 21 // odd so one server has to get more

_, _, out := getOutput(t)
runBenchmark(out, _testLogger, Options{
BOpts: BenchmarkOptions{
MaxRequests: totalCalls,
Connections: 50,
Concurrency: 2,
StatsdHostPort: statsServer.Addr().String(),
PerPeerStats: true,
},
TOpts: tOpts,
ROpts: RequestOptions{
Procedure: fooMethod,
},
}, m)

// Ensure one backend gets more calls than the other
want := map[string]int{
"yab.tester.foo.Simple--foo.latency": totalCalls,
"yab.tester.foo.Simple--foo.success": totalCalls,

// peer 0
"yab.tester.foo.Simple--foo.peer.0.latency": int(r1.Load()),
"yab.tester.foo.Simple--foo.peer.0.success": int(r1.Load()),

// peer 1
"yab.tester.foo.Simple--foo.peer.1.latency": int(r2.Load()),
"yab.tester.foo.Simple--foo.peer.1.success": int(r2.Load()),
}

// Wait for all metrics to be processed
testutils.WaitFor(time.Second, func() bool {
return statsServer.Aggregated()["yab.tester.foo.Simple--foo.latency"] >= totalCalls
})
assert.Equal(t, want, statsServer.Aggregated(), "unexpected stats")
}
3 changes: 3 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type TransportOptions struct {
ShardKey string `long:"sk" description:"The shard key is a transport header that clues where to send a request within a clustered traffic group."`
Jaeger bool `long:"jaeger" description:"Use the Jaeger tracing client to send Uber style traces and baggage headers"`
TransportHeaders map[string]string `short:"T" long:"topt" description:"Transport options for TChannel, protocol headers for HTTP"`
HTTPMethod string `long:"http-method" description:"The HTTP method to use"`

// This is a hack to work around go-flags not allowing disabling flags:
// https://github.com/jessevdk/go-flags/issues/191
Expand All @@ -107,13 +108,15 @@ type BenchmarkOptions struct {

// Benchmark metrics can optionally be reported via statsd.
StatsdHostPort string `long:"statsd" description:"Optional host:port of a StatsD server to report metrics"`
PerPeerStats bool `long:"per-peer-stats" description:"Whether to emit stats by peer rather than aggregated"`
}

func newOptions() *Options {
var opts Options

// Defaults
opts.ROpts.Timeout = timeMillisFlag(time.Second)
opts.TOpts.HTTPMethod = "POST"

// Set flag aliases
opts.ROpts.MethodName.dest = &opts.ROpts.Procedure
Expand Down
12 changes: 12 additions & 0 deletions peerprovider/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ func TestParseHostFile(t *testing.T) {
filename: "valid_peerlist.yaml",
want: []string{"1.1.1.1:1", "2.2.2.2:2"},
},
{
filename: "valid_url_peerlist.yaml",
want: []string{"http://1.1.1.1:1/foo", "tchannel://2.2.2.2:2"},
},
{
filename: "valid_url_peerlist.txt",
want: []string{"http://1.1.1.1:1/foo", "tchannel://2.2.2.2:2"},
},
{
filename: "valid_peerlist.txt",
want: []string{"1.1.1.1:1", "2.2.2.2:2"},
Expand All @@ -36,6 +44,10 @@ func TestParseHostFile(t *testing.T) {
filename: "invalid.json",
errMsg: errPeerListFile.Error(),
},
{
filename: "invalid_url_peerlist.txt",
errMsg: errPeerListFile.Error(),
},
}

for _, tt := range tests {
Expand Down
30 changes: 27 additions & 3 deletions peerprovider/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net"
"net/url"
"strings"

"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -50,12 +52,34 @@ func parseNewlineDelimitedPeers(r io.Reader) ([]string, error) {
continue
}

if _, _, err := net.SplitHostPort(line); err != nil {
return nil, err
// If the line is a host:port or a URL, then it's valid.
_, _, hostPortErr := net.SplitHostPort(line)
if hostPortErr == nil {
hosts = append(hosts, line)
continue
}

urlParseErr := isValidURL(line)
if urlParseErr == nil {
hosts = append(hosts, line)
continue
}

hosts = append(hosts, line)
return nil, fmt.Errorf("failed to parse line %q as host:port (%v) or URL (%v)", line, hostPortErr, urlParseErr)
}

return hosts, nil
}

func isValidURL(line string) error {
u, err := url.Parse(line)
if err != nil {
return err
}

if u.Host == "" {
return fmt.Errorf("url cannot have empty host: %v", line)
}

return nil
}
Loading

0 comments on commit c8bb688

Please sign in to comment.