Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce operator of window function. #14213

Merged
merged 75 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
65d341c
Add basic work processors.
Sh-Zh-7 Nov 12, 2024
e8c2a14
Rewrite all tests.
Sh-Zh-7 Nov 12, 2024
121439f
Refactor all files.
Sh-Zh-7 Nov 12, 2024
6366d71
Rename package from 'processor' to 'workers'
Sh-Zh-7 Nov 12, 2024
e558834
Decrease file's number.
Sh-Zh-7 Nov 14, 2024
0eca37a
Add license header to all files.
Sh-Zh-7 Nov 14, 2024
54571b2
Add necessary comment to all files.
Sh-Zh-7 Nov 14, 2024
b4741a1
Include files borrowed from Trino.
Sh-Zh-7 Nov 14, 2024
80fa918
Remove all requireNonNulls.
Sh-Zh-7 Nov 14, 2024
b79a557
Format all files using spotless plugin.
Sh-Zh-7 Nov 14, 2024
b68f2c7
Make ctor in WorkProcessor public.
Sh-Zh-7 Nov 19, 2024
b1d7db8
Update WorkProcessors.
Sh-Zh-7 Nov 26, 2024
55684e8
Window op prototype.
Sh-Zh-7 Nov 26, 2024
465bce7
Merge master branch from origin.
Sh-Zh-7 Nov 26, 2024
922d3c3
Mvn spotless apply.
Sh-Zh-7 Nov 26, 2024
fe21202
Very, very simple prototype.
Sh-Zh-7 Nov 26, 2024
d5e5622
Run mvn spotless apply.
Sh-Zh-7 Nov 26, 2024
e05b79d
Support multi-channel partition and can sort successfully now.
Sh-Zh-7 Nov 28, 2024
6cb7d00
Finish three types of frame.
Sh-Zh-7 Dec 10, 2024
488a3ae
Refactor whole range module.
Sh-Zh-7 Dec 10, 2024
3839bdf
Adjust project structure.
Sh-Zh-7 Dec 10, 2024
4849895
Finish writing value window functions.
Sh-Zh-7 Dec 10, 2024
a7db416
Finish writing rank window functions.
Sh-Zh-7 Dec 10, 2024
af656e2
Shrink exception implementation in window function.
Sh-Zh-7 Dec 10, 2024
631a367
Combine frame and partition directory.
Sh-Zh-7 Dec 10, 2024
7d4ae3c
Run mvn spotless apply.
Sh-Zh-7 Dec 10, 2024
ddf84cd
Support multiple window functions and frames.
Sh-Zh-7 Dec 10, 2024
ca9b93c
Temporarily save my workspace.
Sh-Zh-7 Dec 10, 2024
c937900
Add `removeInput` and `hasRemoveInput` method to specific Accumulators.
Sh-Zh-7 Dec 11, 2024
9a785d7
Run `mvn spotless:apply`.
Sh-Zh-7 Dec 11, 2024
3a08721
Rename from `hasRemoveInput` to `removable`.
Sh-Zh-7 Dec 11, 2024
5d9c5b0
Merge branch 'remove_input_for_accumulator' into window_op
Sh-Zh-7 Dec 11, 2024
07f1d9f
Finish aggregation window function.
Sh-Zh-7 Dec 11, 2024
6f4ebed
Consider empty frame and ignore nulls in Window Function.
Sh-Zh-7 Dec 11, 2024
cb79108
Optimize with `needPeerGroup` and `needFrame`.
Sh-Zh-7 Dec 11, 2024
7cfb26b
Revert WorkProcessor pull request.
Sh-Zh-7 Dec 11, 2024
e7e35e5
Run `mvn spotless:apply` again.
Sh-Zh-7 Dec 11, 2024
3499f05
Make Window Operator stream processing.
Sh-Zh-7 Dec 17, 2024
35f4d25
Today's `mvn spotless::apply`.
Sh-Zh-7 Dec 17, 2024
f62b41e
Finish operator basic methods.
Sh-Zh-7 Dec 17, 2024
74c61e9
Today's `mvn spotless::apply`.
Sh-Zh-7 Dec 18, 2024
792c1e6
Pass basic unit test in new project.
Sh-Zh-7 Dec 18, 2024
7d34c3b
Add unit tests for rank functions.
Sh-Zh-7 Dec 18, 2024
29983b6
Add unit tests for value functions.
Sh-Zh-7 Dec 18, 2024
38f31a3
Rename method for all window functions.
Sh-Zh-7 Dec 19, 2024
e04917c
Add unit tests for aggregation window functions and fix some bugs.
Sh-Zh-7 Dec 19, 2024
241e6ec
Add unit tests for rows frames.
Sh-Zh-7 Dec 19, 2024
42f4ec7
Add unit tests for groups frames.
Sh-Zh-7 Dec 19, 2024
2b45b3a
Modify according to IDEA's lint.
Sh-Zh-7 Dec 19, 2024
8023f1d
Add unit tests for range frame.
Sh-Zh-7 Dec 20, 2024
5967017
Refactor all test files.
Sh-Zh-7 Dec 20, 2024
fae3a63
`mvn spotless::apply` at the last.
Sh-Zh-7 Dec 20, 2024
aa55f6d
Add apache license header to each new files.
Sh-Zh-7 Dec 20, 2024
0f1912d
Add unit tests for operators.
Sh-Zh-7 Dec 23, 2024
46c525f
Today's mvn spotless::apply.
Sh-Zh-7 Dec 23, 2024
3ba4a72
Remove TODO since I believe in system cache.
Sh-Zh-7 Dec 23, 2024
5d376ae
Introduce timestamp datatype in range frame.
Sh-Zh-7 Dec 23, 2024
28a58ad
Perhaps last mvn spotless::apply before code review.
Sh-Zh-7 Dec 23, 2024
1b2270d
Little modification introduced by code review.
Sh-Zh-7 Jan 14, 2025
b2ef78b
First mvn spotless::apply after code review.
Sh-Zh-7 Jan 14, 2025
e406ebd
Resolve conflicts
Sh-Zh-7 Jan 14, 2025
5782043
Modify by code review(PART I).
Sh-Zh-7 Jan 21, 2025
f7471f3
Merge branch 'master' into window_op
Sh-Zh-7 Jan 21, 2025
6b07f8e
Modify by code review(PART II).
Sh-Zh-7 Jan 21, 2025
75d0454
mvn `spotless::apply` again.
Sh-Zh-7 Jan 21, 2025
5f42807
Modify code by code review(PART III).
Sh-Zh-7 Jan 22, 2025
c5bc897
Extract common part from value window function and rank window functi…
Sh-Zh-7 Jan 22, 2025
a4c97ab
Perhaps last mvn spotless::apply.
Sh-Zh-7 Jan 22, 2025
ab111ad
Fix unit tests errors.
Sh-Zh-7 Jan 22, 2025
be505c3
Temporary commit for refactoring.
Sh-Zh-7 Jan 24, 2025
d8381b4
Support offset in columns.
Sh-Zh-7 Jan 24, 2025
c12995e
Maybe last format before New Year.
Sh-Zh-7 Jan 24, 2025
18d240d
Add license header to each files.
Sh-Zh-7 Jan 24, 2025
da891c9
Modify code by code review(PART IV).
Sh-Zh-7 Jan 24, 2025
2422d2e
Maybe last code format.
Sh-Zh-7 Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;

import org.apache.tsfile.block.column.ColumnBuilder;

public interface WindowFunction {
Beyyes marked this conversation as resolved.
Show resolved Hide resolved
void reset();

void transform(
Partition partition,
ColumnBuilder builder,
int index,
int frameStart,
int frameEnd,
int peerGroupStart,
int peerGroupEnd);

default boolean needPeerGroup() {
return true;
}

default boolean needFrame() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.aggregate;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction;
import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;

import org.apache.tsfile.block.column.ColumnBuilder;

public class AggregationWindowFunction implements WindowFunction {
private final WindowAggregator aggregator;
private int currentStart;
private int currentEnd;

public AggregationWindowFunction(WindowAggregator aggregator) {
this.aggregator = aggregator;
reset();
}

@Override
public void reset() {
aggregator.reset();
currentStart = -1;
currentEnd = -1;
}

@Override
public void transform(
Partition partition,
ColumnBuilder builder,
int index,
int frameStart,
int frameEnd,
int peerGroupStart,
int peerGroupEnd) {
if (frameStart < 0) {
// Empty frame
reset();
} else if (frameStart == currentStart && frameEnd >= currentEnd) {
// Frame expansion
if (frameEnd != currentEnd) {
Partition region = partition.getRegion(currentEnd + 1, frameEnd);
aggregator.addInput(region);
currentEnd = frameEnd;
}
} else {
buildNewFrame(partition, frameStart, frameEnd);
}

aggregator.evaluate(builder);
}

private void buildNewFrame(Partition partition, int frameStart, int frameEnd) {
if (aggregator.removable()) {
int prefix = Math.abs(currentStart - frameStart);
int suffix = Math.abs(currentEnd - frameEnd);
int frameLength = frameEnd - frameStart + 1;

// Compare remove && add cost with re-computation
if (frameLength > prefix + suffix) {
if (currentStart < frameStart) {
Partition region = partition.getRegion(currentStart, frameStart - 1);
aggregator.removeInput(region);
} else if (currentStart > frameStart) {
Partition region = partition.getRegion(frameStart, currentStart - 1);
aggregator.addInput(region);
} // Do nothing when currentStart == frameStart

if (frameEnd < currentEnd) {
Partition region = partition.getRegion(frameEnd + 1, currentEnd);
aggregator.removeInput(region);
} else if (frameEnd > currentEnd) {
Partition region = partition.getRegion(currentEnd + 1, frameEnd);
aggregator.addInput(region);
} // Do nothing when frameEnd == currentEnd

currentStart = frameStart;
currentEnd = frameEnd;
return;
}
}

// Re-compute
aggregator.reset();
Partition region = partition.getRegion(frameStart, frameEnd);
aggregator.addInput(region);

currentStart = frameStart;
currentEnd = frameEnd;
}

@Override
public boolean needPeerGroup() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.aggregate;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAccumulator;

import com.google.common.primitives.Ints;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;

import java.util.List;

import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;

public class WindowAggregator {
private final TableAccumulator accumulator;
private final TSDataType outputType;
private final int[] inputChannels;

public WindowAggregator(
TableAccumulator accumulator, TSDataType outputType, List<Integer> inputChannels) {
this.accumulator = requireNonNull(accumulator, "accumulator is null");
this.outputType = requireNonNull(outputType, "intermediateType is null");
this.inputChannels = Ints.toArray(requireNonNull(inputChannels, "inputChannels is null"));
}

public TSDataType getType() {
return outputType;
}

public void addInput(Partition partition) {
List<Column[]> allColumns = partition.getAllColumns();
for (Column[] columns : allColumns) {
addInput(columns);
}
}

public void addInput(Column[] columns) {
Column[] arguments = new Column[inputChannels.length];
for (int i = 0; i < inputChannels.length; i++) {
arguments[i] = columns[inputChannels[i]];
}

// Process count(*)
int count = columns[0].getPositionCount();
if (arguments.length == 0) {
arguments = new Column[] {new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, count)};
}

AggregationMask mask = AggregationMask.createSelectAll(count);
accumulator.addInput(arguments, mask);
}

public void removeInput(Partition partition) {
List<Column[]> allColumns = partition.getAllColumns();
for (Column[] columns : allColumns) {
removeInput(columns);
}
}

private void removeInput(Column[] columns) {
Column[] arguments = new Column[inputChannels.length];
for (int i = 0; i < inputChannels.length; i++) {
arguments[i] = columns[inputChannels[i]];
}

// Process count(*)
int count = columns[0].getPositionCount();
if (arguments.length == 0) {
arguments = new Column[] {new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, count)};
}

accumulator.removeInput(arguments);
}

public void evaluate(ColumnBuilder columnBuilder) {
accumulator.evaluateFinal(columnBuilder);
}

public void processStatistics(Statistics[] statistics) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can WindowAggregator use statistics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically yes, but it's really rare. A frame must not be smaller than a chunk/page/tsfile so that it can use its statistics.

So I choose to remain this method.

accumulator.addStatistics(statistics);
}

public boolean hasFinalResult() {
return accumulator.hasFinalResult();
}

public void reset() {
accumulator.reset();
}

public boolean removable() {
return accumulator.removable();
}

public long getEstimatedSize() {
return accumulator.getEstimatedSize();
}

public int getChannelCount() {
return this.inputChannels.length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;

import org.apache.tsfile.block.column.ColumnBuilder;

public class CumeDistFunction extends RankWindowFunction {
private long count;

public CumeDistFunction() {
reset();
}

@Override
public void reset() {
super.reset();
count = 0;
}

@Override
public void transform(
Partition partition,
ColumnBuilder builder,
int index,
boolean isNewPeerGroup,
int peerGroupCount) {
if (isNewPeerGroup) {
count += peerGroupCount;
}

builder.writeDouble(((double) count) / partition.getPositionCount());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;

import org.apache.tsfile.block.column.ColumnBuilder;

public class DenseRankFunction extends RankWindowFunction {
private long rank;

public DenseRankFunction() {
reset();
}

@Override
public void reset() {
super.reset();
rank = 0;
}

@Override
public void transform(
Partition partition,
ColumnBuilder builder,
int index,
boolean isNewPeerGroup,
int peerGroupCount) {
if (isNewPeerGroup) {
rank++;
}

builder.writeLong(rank);
}
}
Loading
Loading