diff --git a/src/msg/integration/integration_test.go b/src/msg/integration/integration_test.go index 68cbaac8cf..0c9cdf2092 100644 --- a/src/msg/integration/integration_test.go +++ b/src/msg/integration/integration_test.go @@ -57,7 +57,7 @@ func TestSharedConsumer(t *testing.T) { } } -func TestReplicatedConsumerx(t *testing.T) { +func TestReplicatedConsumer(t *testing.T) { if testing.Short() { t.SkipNow() // Just skip if we're doing a short run } @@ -129,9 +129,9 @@ func TestSharedConsumerWithDeadInstance(t *testing.T) { s.Run(t, ctrl) s.VerifyConsumers(t) testConsumers := s.consumerServices[0].testConsumers - require.True(t, testConsumers[len(testConsumers)-1].consumed <= s.TotalMessages()*10/100) + require.True(t, testConsumers[len(testConsumers)-1].numConsumed() <= s.TotalMessages()*10/100) testConsumers = s.consumerServices[1].testConsumers - require.True(t, testConsumers[len(testConsumers)-1].consumed <= s.TotalMessages()*20/100) + require.True(t, testConsumers[len(testConsumers)-1].numConsumed() <= s.TotalMessages()*20/100) } } @@ -546,8 +546,8 @@ func TestRemoveConsumerService(t *testing.T) { ) s.Run(t, ctrl) s.VerifyConsumers(t) - require.Equal(t, msgPerShard*numberOfShards, len(s.consumerServices[0].consumed)) - require.Equal(t, msgPerShard*numberOfShards, len(s.consumerServices[1].consumed)) + require.Equal(t, msgPerShard*numberOfShards, s.consumerServices[0].numConsumed()) + require.Equal(t, msgPerShard*numberOfShards, s.consumerServices[1].numConsumed()) } } @@ -574,8 +574,8 @@ func TestAddConsumerService(t *testing.T) { }, ) s.Run(t, ctrl) - require.Equal(t, s.ExpectedNumMessages(), len(s.consumerServices[0].consumed)) - require.Equal(t, s.ExpectedNumMessages(), len(s.consumerServices[1].consumed)) - require.True(t, len(s.consumerServices[2].consumed) <= s.ExpectedNumMessages()*80/100) + require.Equal(t, s.ExpectedNumMessages(), s.consumerServices[0].numConsumed()) + require.Equal(t, s.ExpectedNumMessages(), s.consumerServices[1].numConsumed()) + require.True(t, s.consumerServices[2].numConsumed() <= s.ExpectedNumMessages()*80/100) } } diff --git a/src/msg/integration/setup.go b/src/msg/integration/setup.go index c1200f58e9..d797077021 100644 --- a/src/msg/integration/setup.go +++ b/src/msg/integration/setup.go @@ -237,7 +237,7 @@ func (s *setup) Run( func (s *setup) VerifyConsumers(t *testing.T) { numWritesPerProducer := s.ExpectedNumMessages() for _, cs := range s.consumerServices { - require.Equal(t, numWritesPerProducer, len(cs.consumed)) + require.Equal(t, numWritesPerProducer, cs.numConsumed()) } } @@ -407,6 +407,13 @@ func (cs *testConsumerService) markConsumed(b []byte) { cs.consumed[string(b)] = struct{}{} } +func (cs *testConsumerService) numConsumed() int { + cs.Lock() + defer cs.Unlock() + + return len(cs.consumed) +} + func (cs *testConsumerService) Close() { for _, c := range cs.testConsumers { c.Close() @@ -437,6 +444,13 @@ func (c *testConsumer) Close() { close(c.doneCh) } +func (c *testConsumer) numConsumed() int { + c.Lock() + defer c.Unlock() + + return c.consumed +} + func newTestConsumer(t *testing.T, cs *testConsumerService) *testConsumer { consumerListener, err := consumer.NewListener("127.0.0.1:0", testConsumerOptions(t)) require.NoError(t, err) @@ -539,8 +553,8 @@ writer: topicName: topicName topicWatchInitTimeout: 100ms placementWatchInitTimeout: 100ms - messagePool: - size: 100 + # FIXME: Consumers sharing the same pool trigger false-positives in race detector + messagePool: ~ messageRetry: initialBackoff: 20ms maxBackoff: 50ms diff --git a/src/msg/producer/ref_counted.go b/src/msg/producer/ref_counted.go index 93a17fd00e..18ecb24869 100644 --- a/src/msg/producer/ref_counted.go +++ b/src/msg/producer/ref_counted.go @@ -31,24 +31,24 @@ type OnFinalizeFn func(rm *RefCountedMessage) // RefCountedMessage is a reference counted message. type RefCountedMessage struct { - sync.RWMutex + mu sync.RWMutex Message size uint64 onFinalizeFn OnFinalizeFn - refCount *atomic.Int32 - isDroppedOrConsumed *atomic.Bool + // RefCountedMessage must not be copied by value due to RWMutex, + // safe to store values here and not just pointers + refCount atomic.Int32 + isDroppedOrConsumed atomic.Bool } // NewRefCountedMessage creates RefCountedMessage. func NewRefCountedMessage(m Message, fn OnFinalizeFn) *RefCountedMessage { return &RefCountedMessage{ - Message: m, - refCount: atomic.NewInt32(0), - size: uint64(m.Size()), - onFinalizeFn: fn, - isDroppedOrConsumed: atomic.NewBool(false), + Message: m, + size: uint64(m.Size()), + onFinalizeFn: fn, } } @@ -76,12 +76,12 @@ func (rm *RefCountedMessage) DecRef() { // IncReads increments the reads count. func (rm *RefCountedMessage) IncReads() { - rm.RLock() + rm.mu.RLock() } // DecReads decrements the reads count. func (rm *RefCountedMessage) DecReads() { - rm.RUnlock() + rm.mu.RUnlock() } // NumRef returns the number of references remaining. @@ -107,13 +107,13 @@ func (rm *RefCountedMessage) IsDroppedOrConsumed() bool { func (rm *RefCountedMessage) finalize(r FinalizeReason) bool { // NB: This lock prevents the message from being finalized when its still // being read. - rm.Lock() + rm.mu.Lock() if rm.isDroppedOrConsumed.Load() { - rm.Unlock() + rm.mu.Unlock() return false } rm.isDroppedOrConsumed.Store(true) - rm.Unlock() + rm.mu.Unlock() if rm.onFinalizeFn != nil { rm.onFinalizeFn(rm) } diff --git a/src/msg/producer/writer/message.go b/src/msg/producer/writer/message.go index 6c9501ff36..5e8db247b9 100644 --- a/src/msg/producer/writer/message.go +++ b/src/msg/producer/writer/message.go @@ -38,14 +38,14 @@ type message struct { retried int // NB(cw) isAcked could be accessed concurrently by the background thread // in message writer and acked by consumer service writers. - isAcked *atomic.Bool + // Safe to store value inside struct, as message is never copied by value + isAcked atomic.Bool } func newMessage() *message { return &message{ retryAtNanos: 0, retried: 0, - isAcked: atomic.NewBool(false), } } diff --git a/src/msg/producer/writer/message_benchmark_test.go b/src/msg/producer/writer/message_benchmark_test.go new file mode 100644 index 0000000000..76d1a10241 --- /dev/null +++ b/src/msg/producer/writer/message_benchmark_test.go @@ -0,0 +1,76 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package writer + +import ( + "testing" + + "github.com/m3db/m3/src/msg/producer" +) + +var ( + // BenchMessage prevents optimization + BenchMessage *message + // BenchBool prevents optimization + BenchBool bool +) + +type emptyMessage struct{} + +// Shard returns the shard of the message. +func (e emptyMessage) Shard() uint32 { return 0 } + +// Bytes returns the bytes of the message. +func (e emptyMessage) Bytes() []byte { return nil } + +// Size returns the size of the bytes of the message. +func (e emptyMessage) Size() int { return 0 } + +// Finalize will be called by producer to indicate the end of its lifecycle. +func (e emptyMessage) Finalize(_ producer.FinalizeReason) {} + +func BenchmarkMessageAtomics(b *testing.B) { + rm := producer.NewRefCountedMessage(emptyMessage{}, nil) + msg := newMessage() + for n := 0; n < b.N; n++ { + msg.Set(metadata{}, rm, 500) + rm.IncRef() + msg.Ack() + BenchBool = msg.IsAcked() + _, BenchBool = msg.Marshaler() + msg.Close() + BenchMessage = msg + } +} + +func BenchmarkMessageAtomicsAllocs(b *testing.B) { + for n := 0; n < b.N; n++ { + rm := producer.NewRefCountedMessage(emptyMessage{}, nil) + msg := newMessage() + msg.Set(metadata{}, rm, 500) + rm.IncRef() + msg.Ack() + BenchBool = msg.IsAcked() + _, BenchBool = msg.Marshaler() + msg.Close() + BenchMessage = msg + } +}