Skip to content

Commit

Permalink
Adding variation to segment flush size
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 committed Oct 5, 2024
1 parent 8334add commit 5a955b6
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
Expand All @@ -32,6 +33,10 @@ class SegmentFlushThresholdComputer {
static final double CURRENT_SEGMENT_RATIO_WEIGHT = 0.1;
static final double PREVIOUS_SEGMENT_RATIO_WEIGHT = 0.9;
static final double ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT = 1.1;
static final double DEFAULT_FLUSH_THRESHOLD_VARIANCE_PERCENTAGE = 0.0;
private static final String FLUSH_THRESHOLD_VARIANCE_PERCENTAGE =
"realtime.segment.flush.threshold.variance.percentage";
private static final Random RANDOM = new Random();

// num rows to segment size ratio of last committed segment for this table
private double _latestSegmentRowsToSizeRatio;
Expand Down Expand Up @@ -65,6 +70,26 @@ public int computeThreshold(StreamConfig streamConfig, CommittingSegmentDescript
long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2;
double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5;

double segmentSizeVariation = DEFAULT_FLUSH_THRESHOLD_VARIANCE_PERCENTAGE;
try {
if (streamConfig.getStreamConfigsMap().containsKey(FLUSH_THRESHOLD_VARIANCE_PERCENTAGE)) {
segmentSizeVariation =
Double.parseDouble(streamConfig.getStreamConfigsMap().get(FLUSH_THRESHOLD_VARIANCE_PERCENTAGE));
}
} catch (Exception e) {
SegmentSizeBasedFlushThresholdUpdater.LOGGER.warn(
"Invalid value for segment size variation: {}. Using default {}",
streamConfig.getStreamConfigsMap().get(FLUSH_THRESHOLD_VARIANCE_PERCENTAGE),
DEFAULT_FLUSH_THRESHOLD_VARIANCE_PERCENTAGE);
}
if (segmentSizeVariation < 0.0 || segmentSizeVariation >= 1.0) {
SegmentSizeBasedFlushThresholdUpdater.LOGGER.warn(
"Invalid value for segment size variation: {}. Using default {}",
segmentSizeVariation, DEFAULT_FLUSH_THRESHOLD_VARIANCE_PERCENTAGE);
segmentSizeVariation = DEFAULT_FLUSH_THRESHOLD_VARIANCE_PERCENTAGE;
}
SegmentSizeBasedFlushThresholdUpdater.LOGGER.info("Segment size variation set to {}", segmentSizeVariation);

if (committingSegmentZKMetadata == null) { // first segment of the partition, hence committing segment is null
if (_latestSegmentRowsToSizeRatio > 0) { // new partition group added case
long targetSegmentNumRows = (long) (desiredSegmentSizeBytes * _latestSegmentRowsToSizeRatio);
Expand Down Expand Up @@ -157,13 +182,25 @@ public int computeThreshold(StreamConfig streamConfig, CommittingSegmentDescript
}
}
targetSegmentNumRows = capNumRowsIfOverflow(targetSegmentNumRows);
targetSegmentNumRows = applySegmentSizeVariation(segmentSizeVariation, targetSegmentNumRows);
SegmentSizeBasedFlushThresholdUpdater.LOGGER.info(
"Committing segment size {}, current ratio {}, setting threshold for {} as {}",
committingSegmentSizeBytes, _latestSegmentRowsToSizeRatio, newSegmentName, targetSegmentNumRows);

return (int) targetSegmentNumRows;
}

private long applySegmentSizeVariation(double segmentSizeVariation, long targetSegmentNumRows) {
if (segmentSizeVariation > 0.0 && segmentSizeVariation < 1.0) {
double variation = (1 - segmentSizeVariation) + 2 * segmentSizeVariation * RANDOM.nextDouble();
// Unnecessary to check for negative variation as it qwill always be positive
if (variation > 0) {
return (long) (targetSegmentNumRows * variation);
}
}
return targetSegmentNumRows;
}

private long capNumRowsIfOverflow(long targetSegmentNumRows) {
if (targetSegmentNumRows > Integer.MAX_VALUE) {
// TODO Picking Integer.MAX_VALUE for number of rows will most certainly make the segment unloadable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@

import java.time.Clock;
import java.time.ZoneId;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.testng.annotations.Test;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.*;
import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;


public class SegmentFlushThresholdComputerTest {
Expand Down Expand Up @@ -302,4 +304,27 @@ public void testAdjustRowsToSizeRatio() {
// (0.1 * 0.25) + (0.9 * 0.15)
assertEquals(computer.getLatestSegmentRowsToSizeRatio(), 0.16);
}

@Test(invocationCount = 500)
public void testSegmentFlushThresholdVariance() {
SegmentFlushThresholdComputer computer = new SegmentFlushThresholdComputer();
int threshold = 90000;
for (double var = 0; var < 1; var += 0.05) {
StreamConfig streamConfig = mock(StreamConfig.class);
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(200_0000L);
when(streamConfig.getStreamConfigsMap()).thenReturn(
Map.of("realtime.segment.flush.threshold.variance.percentage", String.valueOf(var)));

CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
when(committingSegmentDescriptor.getSegmentSizeBytes()).thenReturn(300_000L);

SegmentZKMetadata committingSegmentZKMetadata = mock(SegmentZKMetadata.class);
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(60_000L, 50_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(60_000);

int computedThreshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor,
committingSegmentZKMetadata, "events3__0__0__20211222T1646Z");
assertTrue(computedThreshold >= (1.0 - var) * threshold && computedThreshold <= (1.0 + var) * threshold);
}
}
}

0 comments on commit 5a955b6

Please sign in to comment.