Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce operator of window function. #14213

Merged
merged 75 commits into from
Jan 24, 2025
Merged
Changes from 4 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
65d341c
Add basic work processors.
Sh-Zh-7 Nov 12, 2024
e8c2a14
Rewrite all tests.
Sh-Zh-7 Nov 12, 2024
121439f
Refactor all files.
Sh-Zh-7 Nov 12, 2024
6366d71
Rename package from 'processor' to 'workers'
Sh-Zh-7 Nov 12, 2024
e558834
Decrease file's number.
Sh-Zh-7 Nov 14, 2024
0eca37a
Add license header to all files.
Sh-Zh-7 Nov 14, 2024
54571b2
Add necessary comment to all files.
Sh-Zh-7 Nov 14, 2024
b4741a1
Include files borrowed from Trino.
Sh-Zh-7 Nov 14, 2024
80fa918
Remove all requireNonNulls.
Sh-Zh-7 Nov 14, 2024
b79a557
Format all files using spotless plugin.
Sh-Zh-7 Nov 14, 2024
b68f2c7
Make ctor in WorkProcessor public.
Sh-Zh-7 Nov 19, 2024
b1d7db8
Update WorkProcessors.
Sh-Zh-7 Nov 26, 2024
55684e8
Window op prototype.
Sh-Zh-7 Nov 26, 2024
465bce7
Merge master branch from origin.
Sh-Zh-7 Nov 26, 2024
922d3c3
Mvn spotless apply.
Sh-Zh-7 Nov 26, 2024
fe21202
Very, very simple prototype.
Sh-Zh-7 Nov 26, 2024
d5e5622
Run mvn spotless apply.
Sh-Zh-7 Nov 26, 2024
e05b79d
Support multi-channel partition and can sort successfully now.
Sh-Zh-7 Nov 28, 2024
6cb7d00
Finish three types of frame.
Sh-Zh-7 Dec 10, 2024
488a3ae
Refactor whole range module.
Sh-Zh-7 Dec 10, 2024
3839bdf
Adjust project structure.
Sh-Zh-7 Dec 10, 2024
4849895
Finish writing value window functions.
Sh-Zh-7 Dec 10, 2024
a7db416
Finish writing rank window functions.
Sh-Zh-7 Dec 10, 2024
af656e2
Shrink exception implementation in window function.
Sh-Zh-7 Dec 10, 2024
631a367
Combine frame and partition directory.
Sh-Zh-7 Dec 10, 2024
7d4ae3c
Run mvn spotless apply.
Sh-Zh-7 Dec 10, 2024
ddf84cd
Support multiple window functions and frames.
Sh-Zh-7 Dec 10, 2024
ca9b93c
Temporarily save my workspace.
Sh-Zh-7 Dec 10, 2024
c937900
Add `removeInput` and `hasRemoveInput` method to specific Accumulators.
Sh-Zh-7 Dec 11, 2024
9a785d7
Run `mvn spotless:apply`.
Sh-Zh-7 Dec 11, 2024
3a08721
Rename from `hasRemoveInput` to `removable`.
Sh-Zh-7 Dec 11, 2024
5d9c5b0
Merge branch 'remove_input_for_accumulator' into window_op
Sh-Zh-7 Dec 11, 2024
07f1d9f
Finish aggregation window function.
Sh-Zh-7 Dec 11, 2024
6f4ebed
Consider empty frame and ignore nulls in Window Function.
Sh-Zh-7 Dec 11, 2024
cb79108
Optimize with `needPeerGroup` and `needFrame`.
Sh-Zh-7 Dec 11, 2024
7cfb26b
Revert WorkProcessor pull request.
Sh-Zh-7 Dec 11, 2024
e7e35e5
Run `mvn spotless:apply` again.
Sh-Zh-7 Dec 11, 2024
3499f05
Make Window Operator stream processing.
Sh-Zh-7 Dec 17, 2024
35f4d25
Today's `mvn spotless::apply`.
Sh-Zh-7 Dec 17, 2024
f62b41e
Finish operator basic methods.
Sh-Zh-7 Dec 17, 2024
74c61e9
Today's `mvn spotless::apply`.
Sh-Zh-7 Dec 18, 2024
792c1e6
Pass basic unit test in new project.
Sh-Zh-7 Dec 18, 2024
7d34c3b
Add unit tests for rank functions.
Sh-Zh-7 Dec 18, 2024
29983b6
Add unit tests for value functions.
Sh-Zh-7 Dec 18, 2024
38f31a3
Rename method for all window functions.
Sh-Zh-7 Dec 19, 2024
e04917c
Add unit tests for aggregation window functions and fix some bugs.
Sh-Zh-7 Dec 19, 2024
241e6ec
Add unit tests for rows frames.
Sh-Zh-7 Dec 19, 2024
42f4ec7
Add unit tests for groups frames.
Sh-Zh-7 Dec 19, 2024
2b45b3a
Modify according to IDEA's lint.
Sh-Zh-7 Dec 19, 2024
8023f1d
Add unit tests for range frame.
Sh-Zh-7 Dec 20, 2024
5967017
Refactor all test files.
Sh-Zh-7 Dec 20, 2024
fae3a63
`mvn spotless::apply` at the last.
Sh-Zh-7 Dec 20, 2024
aa55f6d
Add apache license header to each new files.
Sh-Zh-7 Dec 20, 2024
0f1912d
Add unit tests for operators.
Sh-Zh-7 Dec 23, 2024
46c525f
Today's mvn spotless::apply.
Sh-Zh-7 Dec 23, 2024
3ba4a72
Remove TODO since I believe in system cache.
Sh-Zh-7 Dec 23, 2024
5d376ae
Introduce timestamp datatype in range frame.
Sh-Zh-7 Dec 23, 2024
28a58ad
Perhaps last mvn spotless::apply before code review.
Sh-Zh-7 Dec 23, 2024
1b2270d
Little modification introduced by code review.
Sh-Zh-7 Jan 14, 2025
b2ef78b
First mvn spotless::apply after code review.
Sh-Zh-7 Jan 14, 2025
e406ebd
Resolve conflicts
Sh-Zh-7 Jan 14, 2025
5782043
Modify by code review(PART I).
Sh-Zh-7 Jan 21, 2025
f7471f3
Merge branch 'master' into window_op
Sh-Zh-7 Jan 21, 2025
6b07f8e
Modify by code review(PART II).
Sh-Zh-7 Jan 21, 2025
75d0454
mvn `spotless::apply` again.
Sh-Zh-7 Jan 21, 2025
5f42807
Modify code by code review(PART III).
Sh-Zh-7 Jan 22, 2025
c5bc897
Extract common part from value window function and rank window functi…
Sh-Zh-7 Jan 22, 2025
a4c97ab
Perhaps last mvn spotless::apply.
Sh-Zh-7 Jan 22, 2025
ab111ad
Fix unit tests errors.
Sh-Zh-7 Jan 22, 2025
be505c3
Temporary commit for refactoring.
Sh-Zh-7 Jan 24, 2025
d8381b4
Support offset in columns.
Sh-Zh-7 Jan 24, 2025
c12995e
Maybe last format before New Year.
Sh-Zh-7 Jan 24, 2025
18d240d
Add license header to each files.
Sh-Zh-7 Jan 24, 2025
da891c9
Modify code by code review(PART IV).
Sh-Zh-7 Jan 24, 2025
2422d2e
Maybe last code format.
Sh-Zh-7 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -83,6 +83,34 @@ public void addInput(Column[] arguments) {
}
}

@Override
public void removeInput(Column[] arguments) {
checkArgument(arguments.length == 1, "argument of Avg should be one column");
switch (argumentDataType) {
case INT32:
removeIntInput(arguments[0]);
return;
case INT64:
removeLongInput(arguments[0]);
return;
case FLOAT:
removeFloatInput(arguments[0]);
return;
case DOUBLE:
removeDoubleInput(arguments[0]);
return;
case TEXT:
case BLOB:
case STRING:
case BOOLEAN:
case DATE:
case TIMESTAMP:
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in aggregation AVG : %s", argumentDataType));
}
}

@Override
public void addIntermediate(Column argument) {
checkArgument(
@@ -154,6 +182,11 @@ public void reset() {
this.sumValue = 0.0;
}

@Override
public boolean removable() {
return true;
}

private byte[] serializeState() {
byte[] bytes = new byte[16];
BytesUtils.longToBytes(countValue, bytes, 0);
@@ -204,4 +237,44 @@ private void addDoubleInput(Column column) {
}
}
}

private void removeIntInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
if (!column.isNull(i)) {
countValue--;
sumValue -= column.getInt(i);
}
}
}

private void removeLongInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
if (!column.isNull(i)) {
countValue--;
sumValue -= column.getLong(i);
}
}
}

private void removeFloatInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
if (!column.isNull(i)) {
countValue--;
sumValue += column.getFloat(i);
Beyyes marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

private void removeDoubleInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
if (!column.isNull(i)) {
countValue--;
sumValue -= column.getDouble(i);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -55,6 +55,21 @@ public void addInput(Column[] arguments) {
}
}

@Override
public void removeInput(Column[] arguments) {
checkArgument(arguments.length == 1, "argument of Count should be one column");
int count = arguments[0].getPositionCount();
if (!arguments[0].mayHaveNull()) {
countState -= count;
} else {
for (int i = 0; i < count; i++) {
if (!arguments[0].isNull(i)) {
countState--;
}
}
}
}

@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
@@ -93,4 +108,9 @@ public void addStatistics(Statistics[] statistics) {
public void reset() {
countState = 0;
}

@Override
public boolean removable() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -77,6 +77,34 @@ public void addInput(Column[] arguments) {
}
}

@Override
public void removeInput(Column[] arguments) {
checkArgument(arguments.length == 1, "argument of Sum should be one column");
switch (argumentDataType) {
case INT32:
removeIntInput(arguments[0]);
return;
case INT64:
removeLongInput(arguments[0]);
return;
case FLOAT:
removeFloatInput(arguments[0]);
return;
case DOUBLE:
removeDoubleInput(arguments[0]);
return;
case TEXT:
case BLOB:
case STRING:
case BOOLEAN:
case DATE:
Beyyes marked this conversation as resolved.
Show resolved Hide resolved
case TIMESTAMP:
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in aggregation Sum : %s", argumentDataType));
}
}

@Override
public void addIntermediate(Column argument) {
for (int i = 0; i < argument.getPositionCount(); i++) {
@@ -132,6 +160,11 @@ public void reset() {
this.sumValue = 0.0;
}

@Override
public boolean removable() {
return true;
}

private void addIntInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
@@ -171,4 +204,40 @@ private void addDoubleInput(Column column) {
}
}
}

private void removeIntInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
if (!column.isNull(i)) {
sumValue -= column.getInt(i);
}
}
}

private void removeLongInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
if (!column.isNull(i)) {
sumValue -= column.getLong(i);
}
}
}

private void removeFloatInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
if (!column.isNull(i)) {
sumValue -= column.getFloat(i);
}
}
}

private void removeDoubleInput(Column column) {
int count = column.getPositionCount();
for (int i = 0; i < count; i++) {
if (!column.isNull(i)) {
sumValue -= column.getDouble(i);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -30,6 +30,10 @@ public interface TableAccumulator {

void addInput(Column[] arguments);

default void removeInput(Column[] arguments) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If removeIntermediate exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In window function, we need removeInput rather than removeIntermediate. I dont think the latter is necessary for now.

throw new UnsupportedOperationException("This Accumulator does not support removing inputs!");
}

void addIntermediate(Column argument);

void evaluateIntermediate(ColumnBuilder columnBuilder);
@@ -49,4 +53,8 @@ public interface TableAccumulator {
void addStatistics(Statistics[] statistics);

void reset();

default boolean removable() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -130,6 +130,37 @@ public void addInput(Column[] arguments) {
}
}

@Override
public void removeInput(Column[] arguments) {
switch (seriesDataType) {
case BOOLEAN:
removeBooleanInput(arguments[0]);
break;
case INT32:
case DATE:
removeIntInput(arguments[0]);
break;
case FLOAT:
removeFloatInput(arguments[0]);
break;
case INT64:
case TIMESTAMP:
removeLongInput(arguments[0]);
break;
case DOUBLE:
removeDoubleInput(arguments[0]);
break;
case TEXT:
case STRING:
case BLOB:
removeBinaryInput(arguments[0]);
break;
default:
throw new UnsupportedOperationException(
String.format(UNSUPPORTED_TYPE_MESSAGE, seriesDataType));
}
}

@Override
public void addIntermediate(Column argument) {
checkArgument(
@@ -282,6 +313,11 @@ public void reset() {
nullCount = 0;
}

@Override
public boolean removable() {
return true;
}

// haveNull | nullCount (optional) | countMap
private byte[] serializeCountMap() {
byte[] bytes;
@@ -544,6 +580,72 @@ private void addBinaryInput(Column column) {
}
}

private void removeBooleanInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
boolean key = column.getBoolean(i);
booleanCountMap.compute(key, (k, count) -> count - 1);
} else {
nullCount--;
}
}
}

private void removeIntInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
int key = column.getInt(i);
intCountMap.compute(key, (k, count) -> count - 1);
} else {
nullCount--;
}
}
}

private void removeFloatInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
float key = column.getFloat(i);
floatCountMap.compute(key, (k, count) -> count - 1);
} else {
nullCount--;
}
}
}

private void removeLongInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
long key = column.getLong(i);
longCountMap.compute(key, (k, count) -> count - 1);
} else {
nullCount--;
}
}
}

private void removeDoubleInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
double key = column.getDouble(i);
doubleCountMap.compute(key, (k, count) -> count - 1);
} else {
nullCount--;
}
}
}

private void removeBinaryInput(Column column) {
for (int i = 0; i < column.getPositionCount(); i++) {
if (!column.isNull(i)) {
Binary key = column.getBinary(i);
binaryCountMap.compute(key, (k, count) -> count - 1);
} else {
nullCount--;
}
}
}

private void checkMapSize(int size) {
if (size > MAP_SIZE_THRESHOLD) {
throw new RuntimeException(
Loading