diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/holder/BrokerLoad.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/holder/BrokerLoad.java index 8a9a73391..693c45b91 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/holder/BrokerLoad.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/holder/BrokerLoad.java @@ -70,14 +70,14 @@ public void recordMetric(CruiseControlMetric ccm) { break; case TOPIC: TopicMetric tm = (TopicMetric) ccm; - _dotHandledTopicMetrics.computeIfAbsent(tm.topic(), t -> new RawMetricsHolder()) + _dotHandledTopicMetrics.computeIfAbsent(replaceDotsWithUnderscores(tm.topic()), t -> new RawMetricsHolder()) .recordCruiseControlMetric(ccm); break; case PARTITION: PartitionMetric pm = (PartitionMetric) ccm; - _dotHandledPartitionMetrics.computeIfAbsent(new TopicPartition(pm.topic(), pm.partition()), tp -> new RawMetricsHolder()) + _dotHandledPartitionMetrics.computeIfAbsent(new TopicPartition(replaceDotsWithUnderscores(pm.topic()), pm.partition()), tp -> new RawMetricsHolder()) .recordCruiseControlMetric(ccm); - _dotHandledTopicsWithPartitionSizeReported.add(pm.topic()); + _dotHandledTopicsWithPartitionSizeReported.add(replaceDotsWithUnderscores(pm.topic())); break; default: throw new IllegalStateException(String.format("Should never be here. Unrecognized metric scope %s",