Skip to content

Commit

Permalink
Finalize most of the toxiproxy test framework
Browse files Browse the repository at this point in the history
- Create `setupFunctionalTest()` and `teardownFunctionalTest()` that handle
  testing for kafka presence, resetting toxiproxy, etc, and use them everywhere.
- Rename `kafkaIsAvailable` to `kafkaAvailable` and `kafkaShouldBeAvailable` to
  `kafkaRequired`.
- Implement a couple of helper methods for managing proxies.
- Use toxiproxy for `TestFuncConnectionFailure` so that we know there won't ever
  be anything else listening on that port.
  • Loading branch information
eapache committed May 1, 2015
1 parent f8432e8 commit f0fc7e9
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 17 deletions.
14 changes: 11 additions & 3 deletions functional_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,24 @@ import (
)

func TestFuncConnectionFailure(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Proxies["kafka1"].Enabled = false
SaveProxy(t, "kafka1")

config := NewConfig()
config.Metadata.Retry.Max = 1

_, err := NewClient([]string{"localhost:9000"}, config)
_, err := NewClient([]string{kafkaBrokers[0]}, config)
if err != ErrOutOfBrokers {
t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
}
}

func TestFuncClientMetadata(t *testing.T) {
checkKafkaAvailability(t)
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewConfig()
config.Metadata.Retry.Max = 1
Expand Down Expand Up @@ -60,7 +67,8 @@ func TestFuncClientMetadata(t *testing.T) {

func TestFuncClientCoordinator(t *testing.T) {
checkKafkaVersion(t, "0.8.2")
checkKafkaAvailability(t)
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(kafkaBrokers, nil)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
)

func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
checkKafkaAvailability(t)
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

consumer, err := NewConsumer(kafkaBrokers, nil)
if err != nil {
Expand All @@ -25,7 +26,8 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
}

func TestConsumerHighWaterMarkOffset(t *testing.T) {
checkKafkaAvailability(t)
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

p, err := NewSyncProducer(kafkaBrokers, nil)
if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func TestFuncProducingFlushing(t *testing.T) {
}

func TestFuncMultiPartitionProduce(t *testing.T) {
checkKafkaAvailability(t)
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewConfig()
config.ChannelBufferSize = 20
Expand Down Expand Up @@ -72,7 +73,8 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
}

func TestFuncProducingToInvalidTopic(t *testing.T) {
checkKafkaAvailability(t)
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

producer, err := NewSyncProducer(kafkaBrokers, nil)
if err != nil {
Expand All @@ -91,7 +93,8 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
}

func testProducingMessages(t *testing.T, config *Config) {
checkKafkaAvailability(t)
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config.Producer.Return.Successes = true
config.Consumer.Return.Errors = true
Expand Down Expand Up @@ -177,7 +180,8 @@ func BenchmarkProducerMediumSnappy(b *testing.B) {
}

func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
checkKafkaAvailability(b)
setupFunctionalTest(b)
defer teardownFunctionalTest(b)

producer, err := NewAsyncProducer(kafkaBrokers, conf)
if err != nil {
Expand Down
51 changes: 43 additions & 8 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ const (
)

var (
kafkaIsAvailable, kafkaShouldBeAvailable bool
kafkaBrokers []string
proxy *toxiproxy.Client
kafkaAvailable, kafkaRequired bool
kafkaBrokers []string

proxyClient *toxiproxy.Client
Proxies map[string]*toxiproxy.Proxy
ZKProxies = []string{"zk1", "zk2", "zk3", "zk4", "zk5"}
KafkaProxies = []string{"kafka1", "kafka2", "kafka3", "kafka4", "kafka5"}
)

func init() {
Expand All @@ -41,7 +45,7 @@ func init() {
if proxyAddr == "" {
proxyAddr = VagrantToxiproxy
}
proxy = toxiproxy.NewClient(proxyAddr)
proxyClient = toxiproxy.NewClient(proxyAddr)

kafkaPeers := os.Getenv("KAFKA_PEERS")
if kafkaPeers == "" {
Expand All @@ -51,16 +55,16 @@ func init() {

if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
if err = c.Close(); err == nil {
kafkaIsAvailable = true
kafkaAvailable = true
}
}

kafkaShouldBeAvailable = os.Getenv("CI") != ""
kafkaRequired = os.Getenv("CI") != ""
}

func checkKafkaAvailability(t testing.TB) {
if !kafkaIsAvailable {
if kafkaShouldBeAvailable {
if !kafkaAvailable {
if kafkaRequired {
t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
} else {
t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
Expand All @@ -81,6 +85,37 @@ func checkKafkaVersion(t testing.TB, requiredVersion string) {
}
}

func resetProxies(t testing.TB) {
if err := proxyClient.ResetState(); err != nil {
t.Error(err)
}
Proxies = nil
}

func fetchProxies(t testing.TB) {
var err error
Proxies, err = proxyClient.Proxies()
if err != nil {
t.Fatal(err)
}
}

func SaveProxy(t *testing.T, px string) {
if err := Proxies[px].Save(); err != nil {
t.Fatal(err)
}
}

func setupFunctionalTest(t testing.TB) {
checkKafkaAvailability(t)
resetProxies(t)
fetchProxies(t)
}

func teardownFunctionalTest(t testing.TB) {
resetProxies(t)
}

type kafkaVersion []int

func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
Expand Down

0 comments on commit f0fc7e9

Please sign in to comment.