diff --git a/src/aggregator/client/m3msg_client.go b/src/aggregator/client/m3msg_client.go index b4d297cd86..26ef7e1217 100644 --- a/src/aggregator/client/m3msg_client.go +++ b/src/aggregator/client/m3msg_client.go @@ -111,7 +111,8 @@ func (c *M3MsgClient) WriteUntimedCounter( metadatas: metadatas, }, } - err := c.write(counter.ID, payload) + size, err := c.write(counter.ID, payload) + c.metrics.totalBytesSent.Inc(int64(size)) c.metrics.writeUntimedCounter.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) return err } @@ -129,7 +130,8 @@ func (c *M3MsgClient) WriteUntimedBatchTimer( metadatas: metadatas, }, } - err := c.write(batchTimer.ID, payload) + size, err := c.write(batchTimer.ID, payload) + c.metrics.totalBytesSent.Inc(int64(size)) c.metrics.writeUntimedBatchTimer.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) return err } @@ -147,7 +149,8 @@ func (c *M3MsgClient) WriteUntimedGauge( metadatas: metadatas, }, } - err := c.write(gauge.ID, payload) + size, err := c.write(gauge.ID, payload) + c.metrics.totalBytesSent.Inc(int64(size)) c.metrics.writeUntimedGauge.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) return err } @@ -165,7 +168,8 @@ func (c *M3MsgClient) WriteTimed( metadata: metadata, }, } - err := c.write(metric.ID, payload) + size, err := c.write(metric.ID, payload) + c.metrics.totalBytesSent.Inc(int64(size)) c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) return err } @@ -183,7 +187,8 @@ func (c *M3MsgClient) WritePassthrough( storagePolicy: storagePolicy, }, } - err := c.write(metric.ID, payload) + size, err := c.write(metric.ID, payload) + c.metrics.totalBytesSent.Inc(int64(size)) c.metrics.writePassthrough.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) return err } @@ -201,7 +206,8 @@ func (c *M3MsgClient) WriteTimedWithStagedMetadatas( metadatas: metadatas, }, } - err := c.write(metric.ID, payload) + size, err := c.write(metric.ID, payload) + c.metrics.totalBytesSent.Inc(int64(size)) c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) return err } @@ -219,27 +225,29 @@ func (c *M3MsgClient) WriteForwarded( metadata: metadata, }, } - err := c.write(metric.ID, payload) + size, err := c.write(metric.ID, payload) + c.metrics.totalBytesSent.Inc(int64(size)) c.metrics.writeForwarded.ReportSuccessOrError(err, c.nowFn().Sub(callStart)) return err } //nolint:gocritic -func (c *M3MsgClient) write(metricID id.RawID, payload payloadUnion) error { +func (c *M3MsgClient) write(metricID id.RawID, payload payloadUnion) (int, error) { shard := c.shardFn(metricID, c.m3msg.numShards) msg := c.m3msg.messagePool.Get() - if err := msg.Encode(shard, payload); err != nil { + size, err := msg.Encode(shard, payload) + if err != nil { msg.Finalize(producer.Dropped) - return err + return 0, err } if err := c.m3msg.producer.Produce(msg); err != nil { msg.Finalize(producer.Dropped) - return err + return 0, err } - return nil + return size, nil } // Flush satisfies Client interface, as M3Msg client does not need explicit flushing. @@ -259,6 +267,7 @@ type m3msgClientMetrics struct { writeUntimedGauge instrument.MethodMetrics writePassthrough instrument.MethodMetrics writeForwarded instrument.MethodMetrics + totalBytesSent tally.Counter } func newM3msgClientMetrics( @@ -271,6 +280,7 @@ func newM3msgClientMetrics( writeUntimedGauge: instrument.NewMethodMetrics(scope, "writeUntimedGauge", opts), writePassthrough: instrument.NewMethodMetrics(scope, "writePassthrough", opts), writeForwarded: instrument.NewMethodMetrics(scope, "writeForwarded", opts), + totalBytesSent: scope.Counter("total-bytes-sent"), } } @@ -324,7 +334,7 @@ func newMessage(pool *messagePool) *message { func (m *message) Encode( shard uint32, payload payloadUnion, -) error { +) (int, error) { m.shard = shard switch payload.payloadType { @@ -336,7 +346,7 @@ func (m *message) Encode( StagedMetadatas: payload.untimed.metadatas, } if err := value.ToProto(&m.cm); err != nil { - return err + return 0, err } m.metric = metricpb.MetricWithMetadatas{ @@ -349,7 +359,7 @@ func (m *message) Encode( StagedMetadatas: payload.untimed.metadatas, } if err := value.ToProto(&m.bm); err != nil { - return err + return 0, err } m.metric = metricpb.MetricWithMetadatas{ @@ -362,7 +372,7 @@ func (m *message) Encode( StagedMetadatas: payload.untimed.metadatas, } if err := value.ToProto(&m.gm); err != nil { - return err + return 0, err } m.metric = metricpb.MetricWithMetadatas{ @@ -370,7 +380,7 @@ func (m *message) Encode( GaugeWithMetadatas: &m.gm, } default: - return fmt.Errorf("unrecognized metric type: %v", + return 0, fmt.Errorf("unrecognized metric type: %v", payload.untimed.metric.Type) } case forwardedType: @@ -379,7 +389,7 @@ func (m *message) Encode( ForwardMetadata: payload.forwarded.metadata, } if err := value.ToProto(&m.fm); err != nil { - return err + return 0, err } m.metric = metricpb.MetricWithMetadatas{ @@ -392,7 +402,7 @@ func (m *message) Encode( TimedMetadata: payload.timed.metadata, } if err := value.ToProto(&m.tm); err != nil { - return err + return 0, err } m.metric = metricpb.MetricWithMetadatas{ @@ -405,7 +415,7 @@ func (m *message) Encode( StagedMetadatas: payload.timedWithStagedMetadatas.metadatas, } if err := value.ToProto(&m.tms); err != nil { - return err + return 0, err } m.metric = metricpb.MetricWithMetadatas{ @@ -413,7 +423,7 @@ func (m *message) Encode( TimedMetricWithMetadatas: &m.tms, } default: - return fmt.Errorf("unrecognized payload type: %v", + return 0, fmt.Errorf("unrecognized payload type: %v", payload.payloadType) } @@ -427,7 +437,7 @@ func (m *message) Encode( m.buf = m.buf[:size] _, err := m.metric.MarshalTo(m.buf) - return err + return size, err } func (m *message) Shard() uint32 { diff --git a/src/aggregator/client/m3msg_client_test.go b/src/aggregator/client/m3msg_client_test.go index 9c76c8e84f..2f78295c22 100644 --- a/src/aggregator/client/m3msg_client_test.go +++ b/src/aggregator/client/m3msg_client_test.go @@ -22,11 +22,17 @@ package client import ( "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/uber-go/tally" + "github.com/m3db/m3/src/metrics/metadata" + "github.com/m3db/m3/src/metrics/metric/id" + "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/msg/producer" + "github.com/m3db/m3/src/x/instrument" ) func TestNewM3MsgClient(t *testing.T) { @@ -42,3 +48,43 @@ func TestNewM3MsgClient(t *testing.T) { assert.NotNil(t, c) assert.NoError(t, err) } + +func TestTotalBytesAdded(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + // Mock dependencies + p := producer.NewMockProducer(ctrl) + p.EXPECT().Init() + p.EXPECT().NumShards().Return(uint32(1)) + p.EXPECT().Produce(gomock.Any()).Return(nil).AnyTimes() + + opts := NewM3MsgOptions().SetProducer(p) + client, err := NewM3MsgClient(NewOptions().SetM3MsgOptions(opts)) + assert.NoError(t, err) + + // Mock metric and metadata + counter := unaggregated.Counter{ + ID: id.RawID("testCounter"), + Value: 123, + } + metadatas := metadata.StagedMetadatas{} + + // Mock time function + now := time.Now() + client.(*M3MsgClient).nowFn = func() time.Time { return now } + + testScope := tally.NewTestScope("", make(map[string]string)) + // Mock metrics + client.(*M3MsgClient).metrics = m3msgClientMetrics{ + writeUntimedCounter: instrument.NewMethodMetrics(testScope, "writeUntimedCounter", instrument.TimerOptions{}), + totalBytesSent: testScope.Counter("total-bytes-sent"), + } + + // Call the method + err = client.WriteUntimedCounter(counter, metadatas) + assert.NoError(t, err) + + // Verify the total bytes added + assert.Equal(t, int64(23), testScope.Snapshot().Counters()["total-bytes-sent+"].Value()) +}