Skip to content

Commit

Permalink
[core] Add limitation to the buffer spill disk usage (apache#3108)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aitozi authored Apr 1, 2024
1 parent 6eec8f4 commit 85f1cfd
Show file tree
Hide file tree
Showing 26 changed files with 222 additions and 38 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,12 @@
<td>MemorySize</td>
<td>Amount of data to build up in memory before converting to a sorted on-disk file.</td>
</tr>
<tr>
<td><h5>write-buffer-spill.max-disk-size</h5></td>
<td style="word-wrap: break-word;">infinite</td>
<td>MemorySize</td>
<td>The max disk to use for write buffer spill. This only work when the write buffer spill is enabled</td>
</tr>
<tr>
<td><h5>write-buffer-spillable</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,14 @@ public class CoreOptions implements Serializable {
.withDescription(
"Amount of data to build up in memory before converting to a sorted on-disk file.");

@Documentation.OverrideDefault("infinite")
public static final ConfigOption<MemorySize> WRITE_BUFFER_MAX_DISK_SIZE =
key("write-buffer-spill.max-disk-size")
.memoryType()
.defaultValue(MemorySize.MAX_VALUE)
.withDescription(
"The max disk to use for write buffer spill. This only work when the write buffer spill is enabled");

public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE =
key("write-buffer-spillable")
.booleanType()
Expand Down Expand Up @@ -1256,6 +1264,10 @@ public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreamin
return options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming);
}

public MemorySize writeBufferSpillDiskSize() {
return options.get(WRITE_BUFFER_MAX_DISK_SIZE);
}

public boolean useWriteBufferForAppend() {
return options.get(WRITE_BUFFER_FOR_APPEND);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner
private final IOManager ioManager;

private MemorySegmentPool memorySegmentPool;
private MemorySize maxDiskSize;

public AppendOnlyWriter(
FileIO fileIO,
Expand All @@ -92,7 +94,8 @@ public AppendOnlyWriter(
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
FieldStatsCollector.Factory[] statsCollectors) {
FieldStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
Expand All @@ -109,9 +112,12 @@ public AppendOnlyWriter(
this.fileCompression = fileCompression;
this.ioManager = ioManager;
this.statsCollectors = statsCollectors;
this.maxDiskSize = maxDiskSize;

this.sinkWriter =
useWriteBuffer ? new BufferedSinkWriter(spillable) : new DirectSinkWriter();
useWriteBuffer
? new BufferedSinkWriter(spillable, maxDiskSize)
: new DirectSinkWriter();

if (increment != null) {
newFiles.addAll(increment.newFilesIncrement().newFiles());
Expand Down Expand Up @@ -205,7 +211,7 @@ public void toBufferedWriter() throws Exception {
trySyncLatestCompaction(true);

sinkWriter.close();
sinkWriter = new BufferedSinkWriter(true);
sinkWriter = new BufferedSinkWriter(true, maxDiskSize);
sinkWriter.setMemoryPool(memorySegmentPool);
}
}
Expand Down Expand Up @@ -370,10 +376,13 @@ private class BufferedSinkWriter implements SinkWriter {

private final boolean spillable;

private final MemorySize maxDiskSize;

private RowBuffer writeBuffer;

private BufferedSinkWriter(boolean spillable) {
private BufferedSinkWriter(boolean spillable, MemorySize maxDiskSize) {
this.spillable = spillable;
this.maxDiskSize = maxDiskSize;
}

@Override
Expand Down Expand Up @@ -429,7 +438,8 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
ioManager,
memoryPool,
new InternalRowSerializer(writeSchema),
spillable);
spillable,
maxDiskSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ public void open(
new HeapMemorySegmentPool(
coreOptions.writeBufferSize() / 2, coreOptions.pageSize()),
new InternalRowSerializer(table.rowType()),
true);
true,
coreOptions.writeBufferSpillDiskSize());
}

public void bootstrapKey(InternalRow value) throws IOException {
Expand Down Expand Up @@ -291,7 +292,8 @@ private void bulkLoadBootstrapRecords() {
coreOptions.writeBufferSize() / 2,
coreOptions.pageSize(),
coreOptions.localSortMaxNumFileHandles(),
coreOptions.spillCompression());
coreOptions.spillCompression(),
coreOptions.writeBufferSpillDiskSize());

Function<SortOrder, RowIterator> iteratorFunction =
sortOrder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ public class ChannelWithMeta {
private final FileIOChannel.ID channel;
private final int blockCount;
private final int numBytesInLastBlock;
private final long numBytes;

public ChannelWithMeta(FileIOChannel.ID channel, int blockCount, int numBytesInLastBlock) {
public ChannelWithMeta(
FileIOChannel.ID channel,
int blockCount,
int numBytesInLastBlock,
long numEstimatedBytes) {
this.channel = channel;
this.blockCount = blockCount;
this.numBytesInLastBlock = numBytesInLastBlock;
this.numBytes = numEstimatedBytes;
}

public FileIOChannel.ID getChannel() {
Expand All @@ -42,4 +48,8 @@ public int getBlockCount() {
public int getNumBytesInLastBlock() {
return numBytesInLastBlock;
}

public long getNumBytes() {
return numBytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class ChannelWriterOutputView extends AbstractPagedOutputView {

private long numBytes;
private long numCompressedBytes;
private long writeBytes;

public ChannelWriterOutputView(
BufferFileWriter writer,
Expand All @@ -64,6 +65,7 @@ public int close() throws IOException {
int currentPositionInSegment = getCurrentPositionInSegment();
writeCompressed(currentSegment, currentPositionInSegment);
clear();
this.writeBytes = writer.getSize();
this.writer.close();
}
return -1;
Expand Down Expand Up @@ -93,6 +95,10 @@ public long getNumCompressedBytes() {
return numCompressedBytes;
}

public long getWriteBytes() {
return writeBytes;
}

public int getBlockCount() {
return blockCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.memory.Buffer;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.utils.MutableObjectIterator;

import org.slf4j.Logger;
Expand All @@ -47,6 +48,7 @@ public class ExternalBuffer implements RowBuffer {
private final MemorySegmentPool pool;
private final BinaryRowSerializer binaryRowSerializer;
private final InMemoryBuffer inMemoryBuffer;
private final MemorySize maxDiskSize;

// The size of each segment
private final int segmentSize;
Expand All @@ -57,9 +59,13 @@ public class ExternalBuffer implements RowBuffer {
private boolean addCompleted;

ExternalBuffer(
IOManager ioManager, MemorySegmentPool pool, AbstractRowDataSerializer<?> serializer) {
IOManager ioManager,
MemorySegmentPool pool,
AbstractRowDataSerializer<?> serializer,
MemorySize maxDiskSize) {
this.ioManager = ioManager;
this.pool = pool;
this.maxDiskSize = maxDiskSize;

this.binaryRowSerializer =
serializer instanceof BinaryRowSerializer
Expand Down Expand Up @@ -89,8 +95,22 @@ public void reset() {

@Override
public boolean flushMemory() throws IOException {
spill();
return true;
boolean isFull = getDiskUsage() >= maxDiskSize.getBytes();
if (isFull) {
return false;
} else {
spill();
return true;
}
}

private long getDiskUsage() {
long bytes = 0;

for (ChannelWithMeta spillChannelID : spilledChannelIDs) {
bytes += spillChannelID.getNumBytes();
}
return bytes;
}

@Override
Expand Down Expand Up @@ -138,6 +158,7 @@ private void spill() throws IOException {
BufferFileWriter writer = ioManager.createBufferFileWriter(channel);
int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers();
ArrayList<MemorySegment> segments = inMemoryBuffer.getRecordBufferSegments();
long writeBytes;
try {
// spill in memory buffer in zero-copy.
for (int i = 0; i < numRecordBuffers; i++) {
Expand All @@ -148,6 +169,7 @@ private void spill() throws IOException {
: segment.size();
writer.writeBlock(Buffer.create(segment, bufferSize));
}
writeBytes = writer.getSize();
LOG.info(
"here spill the reset buffer data with {} records {} bytes",
inMemoryBuffer.size(),
Expand All @@ -162,7 +184,8 @@ private void spill() throws IOException {
new ChannelWithMeta(
channel,
inMemoryBuffer.getNumRecordBuffers(),
inMemoryBuffer.getNumBytesInLastBuffer()));
inMemoryBuffer.getNumBytesInLastBuffer(),
writeBytes));

inMemoryBuffer.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -57,9 +58,10 @@ static RowBuffer getBuffer(
IOManager ioManager,
MemorySegmentPool memoryPool,
AbstractRowDataSerializer<InternalRow> serializer,
boolean spillable) {
boolean spillable,
MemorySize maxDiskSize) {
if (spillable) {
return new ExternalBuffer(ioManager, memoryPool, serializer);
return new ExternalBuffer(ioManager, memoryPool, serializer, maxDiskSize);
} else {
return new InMemoryBuffer(memoryPool, serializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ public static BinaryExternalSortBuffer createBulkLoadSorter(
options.writeBufferSize() / 2,
options.pageSize(),
options.localSortMaxNumFileHandles(),
options.spillCompression());
options.spillCompression(),
options.writeBufferSpillDiskSize());
}

/** A class wraps byte[] to implement equals and hashCode. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier;
import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.SortMergeReader;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.sort.SortBuffer;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class MergeSorter {
private final int spillThreshold;
private final int spillSortMaxNumFiles;
private final String compression;
private final MemorySize maxDiskSize;

private final MemorySegmentPool memoryPool;

Expand All @@ -86,6 +88,7 @@ public MergeSorter(
this.memoryPool =
new CachelessSegmentPool(options.sortSpillBufferSize(), options.pageSize());
this.ioManager = ioManager;
this.maxDiskSize = options.writeBufferSpillDiskSize();
}

public MemorySegmentPool memoryPool() {
Expand Down Expand Up @@ -213,7 +216,8 @@ public ExternalSorterWithLevel(@Nullable FieldsComparator userDefinedSeqComparat
sortFields.toArray(),
memoryPool,
spillSortMaxNumFiles,
compression);
compression,
maxDiskSize);
}

public boolean put(KeyValue keyValue) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FieldsComparator;
Expand All @@ -54,6 +55,7 @@
public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {

private final boolean writeBufferSpillable;
private final MemorySize maxDiskSize;
private final int sortMaxFan;
private final String sortCompression;
private final IOManager ioManager;
Expand All @@ -80,6 +82,7 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {

public MergeTreeWriter(
boolean writeBufferSpillable,
MemorySize maxDiskSize,
int sortMaxFan,
String sortCompression,
IOManager ioManager,
Expand All @@ -93,6 +96,7 @@ public MergeTreeWriter(
@Nullable CommitIncrement increment,
@Nullable FieldsComparator userDefinedSeqComparator) {
this.writeBufferSpillable = writeBufferSpillable;
this.maxDiskSize = maxDiskSize;
this.sortMaxFan = sortMaxFan;
this.sortCompression = sortCompression;
this.ioManager = ioManager;
Expand Down Expand Up @@ -144,6 +148,7 @@ public void setMemoryPool(MemorySegmentPool memoryPool) {
userDefinedSeqComparator,
memoryPool,
writeBufferSpillable,
maxDiskSize,
sortMaxFan,
sortCompression,
ioManager);
Expand Down
Loading

0 comments on commit 85f1cfd

Please sign in to comment.