Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add stream VByte encoding for shorts, integers and longs #24749

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ public Slice slice()
@Override
public Slice getUnderlyingSlice()
{
throw new UnsupportedOperationException();
return buffers[0].getSlice();
}

@Override
Expand All @@ -549,7 +549,7 @@ public void reset(int position)
@Override
public int size()
{
throw new UnsupportedOperationException();
return buffers[0].getPosition();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ public void testBigintSerializedSize()
pageSize = 35; // Now we have moved to the normal block implementation so the page size overhead is 35
page = new Page(builder.build());
int firstValueSize = serializedSize(ImmutableList.of(BIGINT), page) - pageSize;
assertThat(firstValueSize).isEqualTo(9); // value size + value overhead
assertThat(firstValueSize).isEqualTo(3); // value size + value overhead

// page with two values
BIGINT.writeLong(builder, 456);
page = new Page(builder.build());
int secondValueSize = serializedSize(ImmutableList.of(BIGINT), page) - (pageSize + firstValueSize);
assertThat(secondValueSize).isEqualTo(8); // value size (value overhead is shared with previous value)
assertThat(secondValueSize).isEqualTo(2); // value size (value overhead is shared with previous value)
}

@Test
Expand All @@ -218,13 +218,13 @@ public void testVarcharSerializedSize()
pageSize = 44; // Now we have moved to the normal block implementation so the page size overhead is 44
page = new Page(builder.build());
int firstValueSize = serializedSize(ImmutableList.of(VARCHAR), page) - pageSize;
assertThat(firstValueSize).isEqualTo(8 + 5); // length + nonNullsCount + "alice"
assertThat(firstValueSize).isEqualTo(6 + 5); // length + nonNullsCount + "alice"

// page with two values
VARCHAR.writeString(builder, "bob");
page = new Page(builder.build());
int secondValueSize = serializedSize(ImmutableList.of(VARCHAR), page) - (pageSize + firstValueSize);
assertThat(secondValueSize).isEqualTo(4 + 3); // length + "bob" (null shared with first entry)
assertThat(secondValueSize).isEqualTo(1 + 3); // length (1 byte) + "bob" (null shared with first entry)
}

private int serializedSize(List<? extends Type> types, Page expectedPage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ public void testStreamingAbortOnDataCorruption()

assertThatThrownBy(() -> getNextPage(exchangeClient))
.isInstanceOf(TrinoException.class)
.hasMessageMatching("Checksum verification failure on localhost when reading from http://localhost:8080/0: Data corruption, read checksum: 0x3f7c49fcdc6f98ea, calculated checksum: 0xcb4f99c2d19a4b04");
.hasMessageMatching("Checksum verification failure on localhost when reading from http://localhost:8080/0: Data corruption, read checksum: 0xfcd7f00d3f9deb37, calculated checksum: 0x51aa7eefb5605c6f");

exchangeClient.close();
}
Expand Down
9 changes: 9 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@
<annotation>@com.fasterxml.jackson.annotation.JsonValue</annotation>
<justification>On-the-wire representation shouldn't rely on the Jackson format</justification>
</item>
<item>
<ignore>true</ignore>
<code>java.method.visibilityIncreased</code>
<old>method long[] io.trino.spi.block.LongArrayBlock::getRawValues()</old>
<new>method long[] io.trino.spi.block.LongArrayBlock::getRawValues()</new>
<oldVisibility>package</oldVisibility>
<newVisibility>public</newVisibility>
<justification>Just a test</justification>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.trino.spi.block.vstream.IntStreamVByte;

import java.util.Optional;

Expand Down Expand Up @@ -48,7 +49,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
blockEncodingSerde.writeBlock(sliceOutput, dictionary);

// ids
sliceOutput.writeInts(dictionaryBlock.getRawIds(), dictionaryBlock.getRawIdsOffset(), dictionaryBlock.getPositionCount());
IntStreamVByte.writeInts(sliceOutput, dictionaryBlock.getRawIds(), dictionaryBlock.getRawIdsOffset(), dictionaryBlock.getPositionCount());
}

@Override
Expand All @@ -62,7 +63,7 @@ public Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput sliceIn

// ids
int[] ids = new int[positionCount];
sliceInput.readInts(ids);
IntStreamVByte.readInts(sliceInput, ids);

// flatten the dictionary
return dictionaryBlock.copyPositions(ids, 0, ids.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.trino.spi.block.vstream.IntStreamVByte;

import static io.trino.spi.block.EncoderUtil.decodeNullBits;
import static io.trino.spi.block.EncoderUtil.encodeNullsAsBits;
Expand All @@ -40,7 +41,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
encodeNullsAsBits(sliceOutput, fixed12Block);

if (!fixed12Block.mayHaveNull()) {
sliceOutput.writeInts(fixed12Block.getRawValues(), fixed12Block.getRawOffset() * 3, fixed12Block.getPositionCount() * 3);
IntStreamVByte.writeInts(sliceOutput, fixed12Block.getRawValues(), fixed12Block.getRawOffset() * 3, fixed12Block.getPositionCount() * 3);
}
else {
int[] valuesWithoutNull = new int[positionCount * 3];
Expand All @@ -55,7 +56,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
}

sliceOutput.writeInt(nonNullPositionCount / 3);
sliceOutput.writeInts(valuesWithoutNull, 0, nonNullPositionCount);
IntStreamVByte.writeInts(sliceOutput, valuesWithoutNull, 0, nonNullPositionCount);
}
}

Expand All @@ -68,11 +69,11 @@ public Fixed12Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput

int[] values = new int[positionCount * 3];
if (valueIsNull == null) {
sliceInput.readInts(values);
IntStreamVByte.readInts(sliceInput, values);
}
else {
int nonNullPositionCount = sliceInput.readInt();
sliceInput.readInts(values, 0, nonNullPositionCount * 3);
IntStreamVByte.readInts(sliceInput, values, 0, nonNullPositionCount * 3);
int position = 3 * (nonNullPositionCount - 1);
for (int i = positionCount - 1; i >= 0 && position >= 0; i--) {
System.arraycopy(values, position, values, 3 * i, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.trino.spi.block.vstream.LongStreamVByte;

import static io.trino.spi.block.EncoderUtil.decodeNullBits;
import static io.trino.spi.block.EncoderUtil.encodeNullsAsBits;
Expand All @@ -40,7 +41,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
encodeNullsAsBits(sliceOutput, int128ArrayBlock);

if (!int128ArrayBlock.mayHaveNull()) {
sliceOutput.writeLongs(int128ArrayBlock.getRawValues(), int128ArrayBlock.getRawOffset() * 2, int128ArrayBlock.getPositionCount() * 2);
LongStreamVByte.writeLongs(sliceOutput, int128ArrayBlock.getRawValues(), int128ArrayBlock.getRawOffset() * 2, int128ArrayBlock.getPositionCount() * 2);
}
else {
long[] valuesWithoutNull = new long[positionCount * 2];
Expand All @@ -54,7 +55,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
}

sliceOutput.writeInt(nonNullPositionCount / 2);
sliceOutput.writeLongs(valuesWithoutNull, 0, nonNullPositionCount);
LongStreamVByte.writeLongs(sliceOutput, valuesWithoutNull, 0, nonNullPositionCount);
}
}

Expand All @@ -67,11 +68,11 @@ public Int128ArrayBlock readBlock(BlockEncodingSerde blockEncodingSerde, SliceIn

long[] values = new long[positionCount * 2];
if (valueIsNull == null) {
sliceInput.readLongs(values);
LongStreamVByte.readLongs(sliceInput, values);
}
else {
int nonNullPositionCount = sliceInput.readInt();
sliceInput.readLongs(values, 0, nonNullPositionCount * 2);
LongStreamVByte.readLongs(sliceInput, values, 0, nonNullPositionCount * 2);
int position = 2 * (nonNullPositionCount - 1);
for (int i = positionCount - 1; i >= 0 && position >= 0; i--) {
System.arraycopy(values, position, values, 2 * i, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.trino.spi.block.vstream.IntStreamVByte;

import static io.trino.spi.block.EncoderUtil.decodeNullBits;
import static io.trino.spi.block.EncoderUtil.encodeNullsAsBits;
Expand Down Expand Up @@ -42,7 +43,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
encodeNullsAsBits(sliceOutput, intArrayBlock);

if (!intArrayBlock.mayHaveNull()) {
sliceOutput.writeInts(intArrayBlock.getRawValues(), intArrayBlock.getRawValuesOffset(), intArrayBlock.getPositionCount());
IntStreamVByte.writeInts(sliceOutput, intArrayBlock.getRawValues(), intArrayBlock.getRawValuesOffset(), intArrayBlock.getPositionCount());
}
else {
int[] valuesWithoutNull = new int[positionCount];
Expand All @@ -55,7 +56,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
}

sliceOutput.writeInt(nonNullPositionCount);
sliceOutput.writeInts(valuesWithoutNull, 0, nonNullPositionCount);
IntStreamVByte.writeInts(sliceOutput, valuesWithoutNull, 0, nonNullPositionCount);
}
}

Expand All @@ -68,13 +69,14 @@ public IntArrayBlock readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput
int[] values = new int[positionCount];

if (valueIsNullPacked == null) {
sliceInput.readInts(values);
IntStreamVByte.readInts(sliceInput, values);
return new IntArrayBlock(0, positionCount, null, values);
}
boolean[] valueIsNull = decodeNullBits(valueIsNullPacked, positionCount);

int nonNullPositionCount = sliceInput.readInt();
sliceInput.readInts(values, 0, nonNullPositionCount);
IntStreamVByte.readInts(sliceInput, values, 0, nonNullPositionCount);

int position = nonNullPositionCount - 1;

// Handle Last (positionCount % 8) values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ int getRawValuesOffset()
return arrayOffset;
}

long[] getRawValues()
public long[] getRawValues()
{
return values;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.trino.spi.block.vstream.LongStreamVByte;

import static io.trino.spi.block.EncoderUtil.decodeNullBits;
import static io.trino.spi.block.EncoderUtil.encodeNullsAsBits;
Expand All @@ -38,11 +39,10 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
LongArrayBlock longArrayBlock = (LongArrayBlock) block;
int positionCount = longArrayBlock.getPositionCount();
sliceOutput.appendInt(positionCount);

encodeNullsAsBits(sliceOutput, longArrayBlock);

if (!longArrayBlock.mayHaveNull()) {
sliceOutput.writeLongs(longArrayBlock.getRawValues(), longArrayBlock.getRawValuesOffset(), longArrayBlock.getPositionCount());
LongStreamVByte.writeLongs(sliceOutput, longArrayBlock.getRawValues(), longArrayBlock.getRawValuesOffset(), longArrayBlock.getPositionCount());
}
else {
long[] valuesWithoutNull = new long[positionCount];
Expand All @@ -55,7 +55,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
}

sliceOutput.writeInt(nonNullPositionCount);
sliceOutput.writeLongs(valuesWithoutNull, 0, nonNullPositionCount);
LongStreamVByte.writeLongs(sliceOutput, valuesWithoutNull, 0, nonNullPositionCount);
}
}

Expand All @@ -68,13 +68,13 @@ public LongArrayBlock readBlock(BlockEncodingSerde blockEncodingSerde, SliceInpu
long[] values = new long[positionCount];

if (valueIsNullPacked == null) {
sliceInput.readLongs(values);
LongStreamVByte.readLongs(sliceInput, values);
return new LongArrayBlock(0, positionCount, null, values);
}
boolean[] valueIsNull = decodeNullBits(valueIsNullPacked, positionCount);

int nonNullPositionCount = sliceInput.readInt();
sliceInput.readLongs(values, 0, nonNullPositionCount);
LongStreamVByte.readLongs(sliceInput, values, 0, nonNullPositionCount);
int position = nonNullPositionCount - 1;

// Handle Last (positionCount % 8) values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.trino.spi.block.vstream.IntStreamVByte;
import io.trino.spi.type.MapType;

import java.util.Optional;
Expand Down Expand Up @@ -58,7 +59,8 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
if (hashTable.isPresent()) {
int hashTableLength = (entriesEndOffset - entriesStartOffset) * HASH_MULTIPLIER;
sliceOutput.appendInt(hashTableLength); // hashtable length
sliceOutput.writeInts(hashTable.get(), entriesStartOffset * HASH_MULTIPLIER, hashTableLength);

IntStreamVByte.writeInts(sliceOutput, hashTable.get(), entriesStartOffset * HASH_MULTIPLIER, hashTableLength);
}
else {
// if the hashTable is null, we write the length -1
Expand All @@ -84,7 +86,7 @@ public Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput sliceIn
int[] hashTable = null;
if (hashTableLength >= 0) {
hashTable = new int[hashTableLength];
sliceInput.readInts(hashTable);
IntStreamVByte.readInts(sliceInput, hashTable);
}

if (keyBlock.getPositionCount() != valueBlock.getPositionCount()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.trino.spi.block.vstream.ShortStreamVByte;

import static io.trino.spi.block.EncoderUtil.decodeNullBits;
import static io.trino.spi.block.EncoderUtil.encodeNullsAsBits;
Expand Down Expand Up @@ -42,7 +43,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
encodeNullsAsBits(sliceOutput, shortArrayBlock);

if (!shortArrayBlock.mayHaveNull()) {
sliceOutput.writeShorts(shortArrayBlock.getRawValues(), shortArrayBlock.getRawValuesOffset(), shortArrayBlock.getPositionCount());
ShortStreamVByte.writeShorts(sliceOutput, shortArrayBlock.getRawValues(), shortArrayBlock.getRawValuesOffset(), shortArrayBlock.getPositionCount());
}
else {
short[] valuesWithoutNull = new short[positionCount];
Expand All @@ -55,7 +56,7 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
}

sliceOutput.writeInt(nonNullPositionCount);
sliceOutput.writeShorts(valuesWithoutNull, 0, nonNullPositionCount);
ShortStreamVByte.writeShorts(sliceOutput, valuesWithoutNull, 0, nonNullPositionCount);
}
}

Expand All @@ -68,13 +69,13 @@ public ShortArrayBlock readBlock(BlockEncodingSerde blockEncodingSerde, SliceInp
short[] values = new short[positionCount];

if (valueIsNullPacked == null) {
sliceInput.readShorts(values);
ShortStreamVByte.readShorts(sliceInput, values);
return new ShortArrayBlock(0, positionCount, null, values);
}
boolean[] valueIsNull = decodeNullBits(valueIsNullPacked, positionCount);

int nonNullPositionCount = sliceInput.readInt();
sliceInput.readShorts(values, 0, nonNullPositionCount);
ShortStreamVByte.readShorts(sliceInput, values, 0, nonNullPositionCount);
int position = nonNullPositionCount - 1;

// Handle Last (positionCount % 8) values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;
import io.trino.spi.block.vstream.IntStreamVByte;

import java.util.Arrays;

Expand Down Expand Up @@ -55,9 +56,8 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO
nonNullsCount += variableWidthBlock.isNull(position) ? 0 : 1;
}

sliceOutput
.appendInt(nonNullsCount)
.writeInts(lengths, 0, nonNullsCount);
sliceOutput.appendInt(nonNullsCount);
IntStreamVByte.writeInts(sliceOutput, lengths, 0, nonNullsCount);

encodeNullsAsBits(sliceOutput, variableWidthBlock);

Expand All @@ -79,7 +79,8 @@ public Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput sliceIn
int[] offsets = new int[positionCount + 1];
// Read the lengths array into the end of the offsets array, since nonNullsCount <= positionCount
int lengthIndex = offsets.length - nonNullsCount;
sliceInput.readInts(offsets, lengthIndex, nonNullsCount);

IntStreamVByte.readInts(sliceInput, offsets, lengthIndex, nonNullsCount);

boolean[] valueIsNull = decodeNullBits(sliceInput, positionCount).orElse(null);
// Transform lengths back to offsets
Expand Down
Loading
Loading