From e8213a5c5865c49b75b3b1be56891afd128293ec Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Tue, 21 Jan 2025 10:35:30 -0800 Subject: [PATCH] Enhance logging and consuming segments handling in TimeSegmentPruner --- .../segmentpruner/TimeSegmentPruner.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java index 59aa65406da8..c9c89d22931a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java @@ -93,10 +93,18 @@ private Interval extractIntervalFromSegmentZKMetaZNRecord(String segment, @Nulla return DEFAULT_INTERVAL; } + // Skip consuming segment since there is no time interval + if (CommonConstants.Segment.Realtime.Status.IN_PROGRESS.toString() + .equalsIgnoreCase(znRecord.getSimpleField(CommonConstants.Segment.Realtime.STATUS))) { + return DEFAULT_INTERVAL; + } + + // Validate time interval long startTime = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1); long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1); if (startTime < 0 || endTime < 0 || startTime > endTime) { - LOGGER.warn("Failed to find valid time interval for segment: {}, table: {}", segment, _tableNameWithType); + // Try to not flush the logs for invalid time intervals + LOGGER.debug("Failed to find valid time interval for segment: {}, table: {}", segment, _tableNameWithType); return DEFAULT_INTERVAL; } @@ -127,8 +135,8 @@ public synchronized void refreshSegment(String segment, @Nullable ZNRecord znRec /** * NOTE: Pruning is done by searching _intervalTree based on request time interval and check if the results - * are in the input segments. By doing so we will have run time O(M * logN) (N: # of all online segments, - * M: # of qualified intersected segments). + * are in the input segments. By doing so we will have run time O(M * logN) (N: # of all online segments, + * M: # of qualified intersected segments). */ @Override public Set prune(BrokerRequest brokerRequest, Set segments) { @@ -161,13 +169,13 @@ public Set prune(BrokerRequest brokerRequest, Set segments) { /** * @return Null if no time condition or cannot filter base on the condition (e.g. 'SELECT * from myTable where time - * < 50 OR firstName = Jason') - * Empty list if time condition is specified but invalid (e.g. 'SELECT * from myTable where time < 50 AND - * time > 100') - * Sorted time intervals without overlapping if time condition is valid + * < 50 OR firstName = Jason') + * Empty list if time condition is specified but invalid (e.g. 'SELECT * from myTable where time < 50 AND + * time > 100') + * Sorted time intervals without overlapping if time condition is valid * - * TODO: 1. Merge adjacent intervals - * 2. Set interval boundary using time granularity instead of millis + * TODO: 1. Merge adjacent intervals + * 2. Set interval boundary using time granularity instead of millis */ @Nullable private List getFilterTimeIntervals(Expression filterExpression) {