From f0fc7e9247afd9b9151034155def175ad1888b01 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 31 Mar 2015 16:07:46 -0400 Subject: [PATCH] Finalize most of the toxiproxy test framework - Create `setupFunctionalTest()` and `teardownFunctionalTest()` that handle testing for kafka presence, resetting toxiproxy, etc, and use them everywhere. - Rename `kafkaIsAvailable` to `kafkaAvailable` and `kafkaShouldBeAvailable` to `kafkaRequired`. - Implement a couple of helper methods for managing proxies. - Use toxiproxy for `TestFuncConnectionFailure` so that we know there won't ever be anything else listening on that port. --- functional_client_test.go | 14 +++++++--- functional_consumer_test.go | 6 +++-- functional_producer_test.go | 12 ++++++--- functional_test.go | 51 +++++++++++++++++++++++++++++++------ 4 files changed, 66 insertions(+), 17 deletions(-) diff --git a/functional_client_test.go b/functional_client_test.go index 54f665a0a..9e8e32968 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -7,17 +7,24 @@ import ( ) func TestFuncConnectionFailure(t *testing.T) { + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + Proxies["kafka1"].Enabled = false + SaveProxy(t, "kafka1") + config := NewConfig() config.Metadata.Retry.Max = 1 - _, err := NewClient([]string{"localhost:9000"}, config) + _, err := NewClient([]string{kafkaBrokers[0]}, config) if err != ErrOutOfBrokers { t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err) } } func TestFuncClientMetadata(t *testing.T) { - checkKafkaAvailability(t) + setupFunctionalTest(t) + defer teardownFunctionalTest(t) config := NewConfig() config.Metadata.Retry.Max = 1 @@ -60,7 +67,8 @@ func TestFuncClientMetadata(t *testing.T) { func TestFuncClientCoordinator(t *testing.T) { checkKafkaVersion(t, "0.8.2") - checkKafkaAvailability(t) + setupFunctionalTest(t) + defer teardownFunctionalTest(t) client, err := NewClient(kafkaBrokers, nil) if err != nil { diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 6afd5cc52..ab8433109 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -6,7 +6,8 @@ import ( ) func TestFuncConsumerOffsetOutOfRange(t *testing.T) { - checkKafkaAvailability(t) + setupFunctionalTest(t) + defer teardownFunctionalTest(t) consumer, err := NewConsumer(kafkaBrokers, nil) if err != nil { @@ -25,7 +26,8 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) { } func TestConsumerHighWaterMarkOffset(t *testing.T) { - checkKafkaAvailability(t) + setupFunctionalTest(t) + defer teardownFunctionalTest(t) p, err := NewSyncProducer(kafkaBrokers, nil) if err != nil { diff --git a/functional_producer_test.go b/functional_producer_test.go index 9f7cc8a36..1504e7600 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -40,7 +40,8 @@ func TestFuncProducingFlushing(t *testing.T) { } func TestFuncMultiPartitionProduce(t *testing.T) { - checkKafkaAvailability(t) + setupFunctionalTest(t) + defer teardownFunctionalTest(t) config := NewConfig() config.ChannelBufferSize = 20 @@ -72,7 +73,8 @@ func TestFuncMultiPartitionProduce(t *testing.T) { } func TestFuncProducingToInvalidTopic(t *testing.T) { - checkKafkaAvailability(t) + setupFunctionalTest(t) + defer teardownFunctionalTest(t) producer, err := NewSyncProducer(kafkaBrokers, nil) if err != nil { @@ -91,7 +93,8 @@ func TestFuncProducingToInvalidTopic(t *testing.T) { } func testProducingMessages(t *testing.T, config *Config) { - checkKafkaAvailability(t) + setupFunctionalTest(t) + defer teardownFunctionalTest(t) config.Producer.Return.Successes = true config.Consumer.Return.Errors = true @@ -177,7 +180,8 @@ func BenchmarkProducerMediumSnappy(b *testing.B) { } func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) { - checkKafkaAvailability(b) + setupFunctionalTest(b) + defer teardownFunctionalTest(b) producer, err := NewAsyncProducer(kafkaBrokers, conf) if err != nil { diff --git a/functional_test.go b/functional_test.go index 1e5445030..171002ee9 100644 --- a/functional_test.go +++ b/functional_test.go @@ -20,9 +20,13 @@ const ( ) var ( - kafkaIsAvailable, kafkaShouldBeAvailable bool - kafkaBrokers []string - proxy *toxiproxy.Client + kafkaAvailable, kafkaRequired bool + kafkaBrokers []string + + proxyClient *toxiproxy.Client + Proxies map[string]*toxiproxy.Proxy + ZKProxies = []string{"zk1", "zk2", "zk3", "zk4", "zk5"} + KafkaProxies = []string{"kafka1", "kafka2", "kafka3", "kafka4", "kafka5"} ) func init() { @@ -41,7 +45,7 @@ func init() { if proxyAddr == "" { proxyAddr = VagrantToxiproxy } - proxy = toxiproxy.NewClient(proxyAddr) + proxyClient = toxiproxy.NewClient(proxyAddr) kafkaPeers := os.Getenv("KAFKA_PEERS") if kafkaPeers == "" { @@ -51,16 +55,16 @@ func init() { if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil { if err = c.Close(); err == nil { - kafkaIsAvailable = true + kafkaAvailable = true } } - kafkaShouldBeAvailable = os.Getenv("CI") != "" + kafkaRequired = os.Getenv("CI") != "" } func checkKafkaAvailability(t testing.TB) { - if !kafkaIsAvailable { - if kafkaShouldBeAvailable { + if !kafkaAvailable { + if kafkaRequired { t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0]) } else { t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0]) @@ -81,6 +85,37 @@ func checkKafkaVersion(t testing.TB, requiredVersion string) { } } +func resetProxies(t testing.TB) { + if err := proxyClient.ResetState(); err != nil { + t.Error(err) + } + Proxies = nil +} + +func fetchProxies(t testing.TB) { + var err error + Proxies, err = proxyClient.Proxies() + if err != nil { + t.Fatal(err) + } +} + +func SaveProxy(t *testing.T, px string) { + if err := Proxies[px].Save(); err != nil { + t.Fatal(err) + } +} + +func setupFunctionalTest(t testing.TB) { + checkKafkaAvailability(t) + resetProxies(t) + fetchProxies(t) +} + +func teardownFunctionalTest(t testing.TB) { + resetProxies(t) +} + type kafkaVersion []int func (kv kafkaVersion) satisfies(other kafkaVersion) bool {