Skip to content

Commit

Permalink
Reduce the log in SegmentPartitionMetadataManager (#14968)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Feb 1, 2025
1 parent 6747ad0 commit f7e3fe9
Showing 1 changed file with 44 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -137,6 +139,8 @@ private static List<String> getOnlineServers(ExternalView externalView, String s
private void computeTablePartitionInfo() {
PartitionInfo[] partitionInfoMap = new PartitionInfo[_numPartitions];
List<String> segmentsWithInvalidPartition = new ArrayList<>();
List<Pair<String, Integer>> unavailableSegments = new ArrayList<>();
List<Triple<String, Integer, Integer>> segmentsReducingFullyReplicatedServers = new ArrayList<>();
List<Map.Entry<String, SegmentInfo>> newSegmentInfoEntries = new ArrayList<>();
long currentTimeMs = System.currentTimeMillis();
for (Map.Entry<String, SegmentInfo> entry : _segmentInfoMap.entrySet()) {
Expand All @@ -154,34 +158,49 @@ private void computeTablePartitionInfo() {
}
List<String> onlineServers = segmentInfo._onlineServers;
PartitionInfo partitionInfo = partitionInfoMap[partitionId];
if (partitionInfo == null) {
Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
List<String> segments = new ArrayList<>();
segments.add(segment);
partitionInfo = new PartitionInfo(fullyReplicatedServers, segments);
partitionInfoMap[partitionId] = partitionInfo;
if (onlineServers.isEmpty()) {
LOGGER.warn("Found segment: {} without any available replica in table: {}, partition: {}", segment,
_tableNameWithType, partitionId);
if (onlineServers.isEmpty()) {
unavailableSegments.add(Pair.of(segment, partitionId));
if (partitionInfo == null) {
List<String> segments = new ArrayList<>();
segments.add(segment);
partitionInfo = new PartitionInfo(new HashSet<>(), segments);
partitionInfoMap[partitionId] = partitionInfo;
} else {
partitionInfo._fullyReplicatedServers.clear();
partitionInfo._segments.add(segment);
}
} else {
if (partitionInfo._fullyReplicatedServers.retainAll(onlineServers)) {
LOGGER.warn("Found segment: {} with online servers: {} that reduces the fully replicated servers to: {} "
+ "in table: {}, partition: {}", segment, onlineServers, partitionInfo._fullyReplicatedServers,
_tableNameWithType, partitionId);
if (partitionInfo == null) {
Set<String> fullyReplicatedServers = new HashSet<>(onlineServers);
List<String> segments = new ArrayList<>();
segments.add(segment);
partitionInfo = new PartitionInfo(fullyReplicatedServers, segments);
partitionInfoMap[partitionId] = partitionInfo;
} else {
if (partitionInfo._fullyReplicatedServers.retainAll(onlineServers)) {
segmentsReducingFullyReplicatedServers.add(
Triple.of(segment, partitionId, partitionInfo._fullyReplicatedServers.size()));
}
partitionInfo._segments.add(segment);
}
partitionInfo._segments.add(segment);
}
}
if (!segmentsWithInvalidPartition.isEmpty()) {
int numSegmentsWithInvalidPartition = segmentsWithInvalidPartition.size();
if (numSegmentsWithInvalidPartition <= 10) {
LOGGER.warn("Found {} segments: {} with invalid partition in table: {}", numSegmentsWithInvalidPartition,
segmentsWithInvalidPartition, _tableNameWithType);
} else {
LOGGER.warn("Found {} segments: {}... with invalid partition in table: {}", numSegmentsWithInvalidPartition,
segmentsWithInvalidPartition.subList(0, 10), _tableNameWithType);
}
int numSegments = segmentsWithInvalidPartition.size();
LOGGER.warn("Found {} segments: {} with invalid partition in table: {}", numSegments,
numSegments <= 10 ? segmentsWithInvalidPartition : segmentsWithInvalidPartition.subList(0, 10) + "...",
_tableNameWithType);
}
if (!unavailableSegments.isEmpty()) {
int numSegments = unavailableSegments.size();
LOGGER.warn("Found {} unavailable segments (name,partition): {} in table: {}", numSegments,
numSegments <= 10 ? unavailableSegments : unavailableSegments.subList(0, 10) + "...", _tableNameWithType);
}
if (!segmentsReducingFullyReplicatedServers.isEmpty()) {
int numSegments = segmentsReducingFullyReplicatedServers.size();
LOGGER.warn("Found {} segments (name,partition,reducedTo): {} reducing fully replicated servers in table: {}",
numSegments, numSegments <= 10 ? segmentsReducingFullyReplicatedServers
: segmentsReducingFullyReplicatedServers.subList(0, 10) + "...", _tableNameWithType);
}
// Process new segments
if (!newSegmentInfoEntries.isEmpty()) {
Expand Down Expand Up @@ -218,14 +237,9 @@ private void computeTablePartitionInfo() {
}
}
if (!excludedNewSegments.isEmpty()) {
int numExcludedNewSegments = excludedNewSegments.size();
if (numExcludedNewSegments <= 10) {
LOGGER.info("Excluded {} new segments: {} without all replicas available in table: {}",
numExcludedNewSegments, excludedNewSegments, _tableNameWithType);
} else {
LOGGER.info("Excluded {} new segments: {}... without all replicas available in table: {}",
numExcludedNewSegments, excludedNewSegments.subList(0, 10), _tableNameWithType);
}
int numSegments = excludedNewSegments.size();
LOGGER.info("Excluded {} new segments: {}... without all replicas available in table: {}", numSegments,
numSegments <= 10 ? excludedNewSegments : excludedNewSegments.subList(0, 10) + "...", _tableNameWithType);
}
}
_tablePartitionInfo =
Expand Down

0 comments on commit f7e3fe9

Please sign in to comment.