Skip to content

Commit

Permalink
metric(cdc): fix uncorrect metric WorkerBusyRatio (pingcap#11669)
Browse files Browse the repository at this point in the history
  • Loading branch information
wk989898 authored Oct 31, 2024
1 parent 59335d0 commit d728d02
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 90 deletions.
2 changes: 1 addition & 1 deletion cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ var (
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "region_worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for region worker.",
Help: "Busy ratio for region worker.",
}, []string{"namespace", "changefeed", "table", "store", "type"})
workerChannelSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
26 changes: 13 additions & 13 deletions cdc/redo/common/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,61 +29,61 @@ var (
Subsystem: subsystem,
Name: "write_bytes_total",
Help: "Total number of bytes redo log written",
}, []string{"namespace", "changefeed"})
}, []string{"namespace", "changefeed", "type"})

// RedoFsyncDurationHistogram records the latency distributions of fsync called by redo writer.
RedoFsyncDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "fsync_duration_seconds",
Help: "The latency distributions of fsync called by redo writer",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{"namespace", "changefeed", "type"})

// RedoFlushAllDurationHistogram records the latency distributions of flushAll
// called by redo writer.
RedoFlushAllDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "flushall_duration_seconds",
Name: "flush_all_duration_seconds",
Help: "The latency distributions of flushall called by redo writer",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{"namespace", "changefeed", "type"})

// RedoTotalRowsCountGauge records the total number of rows written to redo log.
RedoTotalRowsCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_rows_count",
Help: "The total count of rows that are processed by redo writer",
}, []string{"namespace", "changefeed"})
}, []string{"namespace", "changefeed", "type"})

// RedoWriteLogDurationHistogram records the latency distributions of writeLog.
RedoWriteLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "write_log_duration_seconds",
Help: "The latency distributions of writeLog called by redoManager",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{"namespace", "changefeed", "type"})

// RedoFlushLogDurationHistogram records the latency distributions of flushLog.
RedoFlushLogDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "flush_log_duration_seconds",
Help: "The latency distributions of flushLog called by redoManager",
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 13),
}, []string{"namespace", "changefeed"})
Buckets: prometheus.ExponentialBuckets(0.001, 2.0, 16),
}, []string{"namespace", "changefeed", "type"})

// RedoWorkerBusyRatio records the busy ratio of redo bgUpdateLog worker.
RedoWorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for redo bgUpdateLog worker.",
}, []string{"namespace", "changefeed"})
Help: "Busy ratio for redo bgUpdateLog worker.",
}, []string{"namespace", "changefeed", "type"})
)

// InitMetrics registers all metrics in this file
Expand Down
18 changes: 8 additions & 10 deletions cdc/redo/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ func newLogManager(
logBuffer: chann.NewAutoDrainChann[cacheEvents](),
rtsMap: spanz.SyncMap{},
metricWriteLogDuration: common.RedoWriteLogDurationHistogram.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
WithLabelValues(changefeedID.Namespace, changefeedID.ID, logType),
metricFlushLogDuration: common.RedoFlushLogDurationHistogram.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
WithLabelValues(changefeedID.Namespace, changefeedID.ID, logType),
metricTotalRowsCount: common.RedoTotalRowsCountGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
WithLabelValues(changefeedID.Namespace, changefeedID.ID, logType),
metricRedoWorkerBusyRatio: common.RedoWorkerBusyRatio.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
WithLabelValues(changefeedID.Namespace, changefeedID.ID, logType),
}
}

Expand Down Expand Up @@ -521,10 +521,10 @@ func (m *logManager) bgUpdateLog(ctx context.Context, flushDuration time.Duratio
logErrCh := make(chan error, 1)
handleErr := func(err error) { logErrCh <- err }

overseerTicker := time.NewTicker(time.Second * 5)
overseerDuration := time.Second * 5
overseerTicker := time.NewTicker(overseerDuration)
defer overseerTicker.Stop()
var workTimeSlice time.Duration
startToWork := time.Now()
for {
select {
case <-ctx.Done():
Expand All @@ -536,10 +536,8 @@ func (m *logManager) bgUpdateLog(ctx context.Context, flushDuration time.Duratio
return nil // channel closed
}
err = m.handleEvent(ctx, event, &workTimeSlice)
case now := <-overseerTicker.C:
busyRatio := int(workTimeSlice.Seconds() / now.Sub(startToWork).Seconds() * 1000)
m.metricRedoWorkerBusyRatio.Add(float64(busyRatio))
startToWork = now
case <-overseerTicker.C:
m.metricRedoWorkerBusyRatio.Add(workTimeSlice.Seconds())
workTimeSlice = 0
case err = <-logErrCh:
}
Expand Down
24 changes: 15 additions & 9 deletions cdc/redo/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (m *metaManager) preStart(ctx context.Context) error {
m.extStorage = extStorage

m.metricFlushLogDuration = common.RedoFlushLogDurationHistogram.
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
WithLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID, redo.RedoMetaFileType)

err = m.preCleanupExtStorage(ctx)
if err != nil {
Expand Down Expand Up @@ -469,17 +469,23 @@ func (m *metaManager) flush(ctx context.Context, meta common.LogMeta) error {
return nil
}

// Cleanup removes all redo logs of this manager, it is called when changefeed is removed
// only owner should call this method.
func (m *metaManager) Cleanup(ctx context.Context) error {
common.RedoWriteLogDurationHistogram.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
func (m *metaManager) cleanup(logType string) {
common.RedoFlushLogDurationHistogram.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID, logType)
common.RedoWriteLogDurationHistogram.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID, logType)
common.RedoTotalRowsCountGauge.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID, logType)
common.RedoWorkerBusyRatio.
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID)
DeleteLabelValues(m.changeFeedID.Namespace, m.changeFeedID.ID, logType)
}

// Cleanup removes all redo logs of this manager, it is called when changefeed is removed
// only owner should call this method.
func (m *metaManager) Cleanup(ctx context.Context) error {
m.cleanup(redo.RedoMetaFileType)
m.cleanup(redo.RedoRowLogFileType)
m.cleanup(redo.RedoDDLLogFileType)
return m.deleteAllLogs(ctx)
}

Expand Down
13 changes: 6 additions & 7 deletions cdc/redo/writer/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func NewFileWriter(
storage: extStorage,

metricFsyncDuration: common.RedoFsyncDurationHistogram.
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID),
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID, cfg.LogType),
metricFlushAllDuration: common.RedoFlushAllDurationHistogram.
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID),
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID, cfg.LogType),
metricWriteBytes: common.RedoWriteBytesGauge.
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID),
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID, cfg.LogType),
}
if w.op.GetUUIDGenerator != nil {
w.uuidGenerator = w.op.GetUUIDGenerator()
Expand Down Expand Up @@ -213,11 +213,11 @@ func (w *Writer) Close() error {
}

common.RedoFlushAllDurationHistogram.
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID)
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID, w.cfg.LogType)
common.RedoFsyncDurationHistogram.
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID)
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID, w.cfg.LogType)
common.RedoWriteBytesGauge.
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID)
DeleteLabelValues(w.cfg.ChangeFeedID.Namespace, w.cfg.ChangeFeedID.ID, w.cfg.LogType)

ctx, cancel := context.WithTimeout(context.Background(), redo.CloseTimeout)
defer cancel()
Expand Down Expand Up @@ -399,7 +399,6 @@ func (w *Writer) flushAndRotateFile() error {
if err != nil {
return nil
}

w.metricFlushAllDuration.Observe(time.Since(start).Seconds())

return err
Expand Down
30 changes: 15 additions & 15 deletions cdc/redo/writer/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ func TestWriterWrite(t *testing.T) {
uint64buf: make([]byte, 8),
running: *atomic.NewBool(true),
metricWriteBytes: common.RedoWriteBytesGauge.
WithLabelValues("default", "test-cf"),
WithLabelValues("default", "test-cf", redo.RedoRowLogFileType),
metricFsyncDuration: common.RedoFsyncDurationHistogram.
WithLabelValues("default", "test-cf"),
WithLabelValues("default", "test-cf", redo.RedoRowLogFileType),
metricFlushAllDuration: common.RedoFlushAllDurationHistogram.
WithLabelValues("default", "test-cf"),
WithLabelValues("default", "test-cf", redo.RedoRowLogFileType),
uuidGenerator: uuidGen,
}

Expand Down Expand Up @@ -158,11 +158,11 @@ func TestWriterWrite(t *testing.T) {
uint64buf: make([]byte, 8),
running: *atomic.NewBool(true),
metricWriteBytes: common.RedoWriteBytesGauge.
WithLabelValues("default", "test-cf11"),
WithLabelValues("default", "test-cf11", redo.RedoRowLogFileType),
metricFsyncDuration: common.RedoFsyncDurationHistogram.
WithLabelValues("default", "test-cf11"),
WithLabelValues("default", "test-cf11", redo.RedoRowLogFileType),
metricFlushAllDuration: common.RedoFlushAllDurationHistogram.
WithLabelValues("default", "test-cf11"),
WithLabelValues("default", "test-cf11", redo.RedoRowLogFileType),
uuidGenerator: uuidGen,
}

Expand Down Expand Up @@ -253,11 +253,11 @@ func TestNewFileWriter(t *testing.T) {
uint64buf: make([]byte, 8),
storage: mockStorage,
metricWriteBytes: common.RedoWriteBytesGauge.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoRowLogFileType),
metricFsyncDuration: common.RedoFsyncDurationHistogram.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoRowLogFileType),
metricFlushAllDuration: common.RedoFlushAllDurationHistogram.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoRowLogFileType),
uuidGenerator: uuidGen,
}
w.running.Store(true)
Expand Down Expand Up @@ -310,11 +310,11 @@ func TestRotateFileWithFileAllocator(t *testing.T) {
},
uint64buf: make([]byte, 8),
metricWriteBytes: common.RedoWriteBytesGauge.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoRowLogFileType),
metricFsyncDuration: common.RedoFsyncDurationHistogram.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoRowLogFileType),
metricFlushAllDuration: common.RedoFlushAllDurationHistogram.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoRowLogFileType),
storage: mockStorage,
uuidGenerator: uuidGen,
}
Expand Down Expand Up @@ -377,11 +377,11 @@ func TestRotateFileWithoutFileAllocator(t *testing.T) {
},
uint64buf: make([]byte, 8),
metricWriteBytes: common.RedoWriteBytesGauge.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoDDLLogFileType),
metricFsyncDuration: common.RedoFsyncDurationHistogram.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoDDLLogFileType),
metricFlushAllDuration: common.RedoFlushAllDurationHistogram.
WithLabelValues("default", "test"),
WithLabelValues("default", "test", redo.RedoDDLLogFileType),
storage: mockStorage,
uuidGenerator: uuidGen,
}
Expand Down
9 changes: 5 additions & 4 deletions cdc/redo/writer/memory/file_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ func newFileWorkerGroup(
},
flushCh: make(chan *fileCache),
metricWriteBytes: common.RedoWriteBytesGauge.
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID),
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID, cfg.LogType),
metricFlushAllDuration: common.RedoFlushAllDurationHistogram.
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID),
WithLabelValues(cfg.ChangeFeedID.Namespace, cfg.ChangeFeedID.ID, cfg.LogType),
}
}

Expand Down Expand Up @@ -179,9 +179,9 @@ func (f *fileWorkerGroup) Run(

func (f *fileWorkerGroup) close() {
common.RedoFlushAllDurationHistogram.
DeleteLabelValues(f.cfg.ChangeFeedID.Namespace, f.cfg.ChangeFeedID.ID)
DeleteLabelValues(f.cfg.ChangeFeedID.Namespace, f.cfg.ChangeFeedID.ID, f.cfg.LogType)
common.RedoWriteBytesGauge.
DeleteLabelValues(f.cfg.ChangeFeedID.Namespace, f.cfg.ChangeFeedID.ID)
DeleteLabelValues(f.cfg.ChangeFeedID.Namespace, f.cfg.ChangeFeedID.ID, f.cfg.LogType)
}

func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error {
Expand All @@ -191,6 +191,7 @@ func (f *fileWorkerGroup) bgFlushFileCache(egCtx context.Context) error {
return errors.Trace(egCtx.Err())
case file := <-f.flushCh:
start := time.Now()

if err := file.writer.Close(); err != nil {
return errors.Trace(err)
}
Expand Down
15 changes: 6 additions & 9 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func newDMLWorker(
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFlushDuration: mcloudstorage.CloudStorageFlushDurationHistogram.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricsWorkerBusyRatio: mcloudstorage.CloudStorageWorkerBusyRatioCounter.
metricsWorkerBusyRatio: mcloudstorage.CloudStorageWorkerBusyRatio.
WithLabelValues(changefeedID.Namespace, changefeedID.ID, strconv.Itoa(id)),
}

Expand Down Expand Up @@ -158,19 +158,16 @@ func (d *dmlWorker) run(ctx context.Context) error {
// flushMessages flushed messages of active tables to cloud storage.
// active tables are those tables that have received events after the last flush.
func (d *dmlWorker) flushMessages(ctx context.Context) error {
var flushTimeSlice, totalTimeSlice time.Duration
overseerTicker := time.NewTicker(d.config.FlushInterval * 2)
var flushTimeSlice time.Duration
overseerDuration := d.config.FlushInterval * 2
overseerTicker := time.NewTicker(overseerDuration)
defer overseerTicker.Stop()
startToWork := time.Now()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case now := <-overseerTicker.C:
totalTimeSlice = now.Sub(startToWork)
busyRatio := flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000
d.metricsWorkerBusyRatio.Add(busyRatio)
startToWork = now
case <-overseerTicker.C:
d.metricsWorkerBusyRatio.Add(flushTimeSlice.Seconds())
flushTimeSlice = 0
case batchedTask := <-d.toBeFlushedCh:
if atomic.LoadUint64(&d.isClosed) == 1 {
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/metrics/cloudstorage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ var (
}, []string{"namespace", "changefeed"})

// CloudStorageWorkerBusyRatio records the busy ratio of CloudStorage bgUpdateLog worker.
CloudStorageWorkerBusyRatioCounter = prometheus.NewCounterVec(
CloudStorageWorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "cloud_storage_worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for cloud storage sink dml worker.",
Help: "Busy ratio for cloud storage sink dml worker.",
}, []string{"namespace", "changefeed", "id"})
)

Expand All @@ -71,5 +71,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(CloudStorageFileCountGauge)
registry.MustRegister(CloudStorageWriteDurationHistogram)
registry.MustRegister(CloudStorageFlushDurationHistogram)
registry.MustRegister(CloudStorageWorkerBusyRatioCounter)
registry.MustRegister(CloudStorageWorkerBusyRatio)
}
Loading

0 comments on commit d728d02

Please sign in to comment.