Skip to content

Commit

Permalink
Enhance logging and consuming segments handling in TimeSegmentPruner
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Jan 21, 2025
1 parent 26ad816 commit e8213a5
Showing 1 changed file with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,18 @@ private Interval extractIntervalFromSegmentZKMetaZNRecord(String segment, @Nulla
return DEFAULT_INTERVAL;
}

// Skip consuming segment since there is no time interval
if (CommonConstants.Segment.Realtime.Status.IN_PROGRESS.toString()
.equalsIgnoreCase(znRecord.getSimpleField(CommonConstants.Segment.Realtime.STATUS))) {
return DEFAULT_INTERVAL;
}

// Validate time interval
long startTime = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1);
long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
if (startTime < 0 || endTime < 0 || startTime > endTime) {
LOGGER.warn("Failed to find valid time interval for segment: {}, table: {}", segment, _tableNameWithType);
// Try to not flush the logs for invalid time intervals
LOGGER.debug("Failed to find valid time interval for segment: {}, table: {}", segment, _tableNameWithType);
return DEFAULT_INTERVAL;
}

Expand Down Expand Up @@ -127,8 +135,8 @@ public synchronized void refreshSegment(String segment, @Nullable ZNRecord znRec

/**
* NOTE: Pruning is done by searching _intervalTree based on request time interval and check if the results
* are in the input segments. By doing so we will have run time O(M * logN) (N: # of all online segments,
* M: # of qualified intersected segments).
* are in the input segments. By doing so we will have run time O(M * logN) (N: # of all online segments,
* M: # of qualified intersected segments).
*/
@Override
public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
Expand Down Expand Up @@ -161,13 +169,13 @@ public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {

/**
* @return Null if no time condition or cannot filter base on the condition (e.g. 'SELECT * from myTable where time
* < 50 OR firstName = Jason')
* Empty list if time condition is specified but invalid (e.g. 'SELECT * from myTable where time < 50 AND
* time > 100')
* Sorted time intervals without overlapping if time condition is valid
* < 50 OR firstName = Jason')
* Empty list if time condition is specified but invalid (e.g. 'SELECT * from myTable where time < 50 AND
* time > 100')
* Sorted time intervals without overlapping if time condition is valid
*
* TODO: 1. Merge adjacent intervals
* 2. Set interval boundary using time granularity instead of millis
* TODO: 1. Merge adjacent intervals
* 2. Set interval boundary using time granularity instead of millis
*/
@Nullable
private List<Interval> getFilterTimeIntervals(Expression filterExpression) {
Expand Down

0 comments on commit e8213a5

Please sign in to comment.