Skip to content

Commit

Permalink
Introduce operator of window function.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sh-Zh-7 authored Jan 24, 2025
1 parent 1b0af7d commit f2d24ff
Show file tree
Hide file tree
Showing 47 changed files with 7,408 additions and 1 deletion.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;

import org.apache.tsfile.block.column.ColumnBuilder;

public interface WindowFunction {
void reset();

void transform(
Partition partition,
ColumnBuilder builder,
int index,
int frameStart,
int frameEnd,
int peerGroupStart,
int peerGroupEnd);

default boolean needPeerGroup() {
return true;
}

default boolean needFrame() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.aggregate;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction;
import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;

import org.apache.tsfile.block.column.ColumnBuilder;

public class AggregationWindowFunction implements WindowFunction {
private final WindowAggregator aggregator;
private int currentStart;
private int currentEnd;

public AggregationWindowFunction(WindowAggregator aggregator) {
this.aggregator = aggregator;
reset();
}

@Override
public void reset() {
aggregator.reset();
currentStart = -1;
currentEnd = -1;
}

@Override
public void transform(
Partition partition,
ColumnBuilder builder,
int index,
int frameStart,
int frameEnd,
int peerGroupStart,
int peerGroupEnd) {
if (frameStart < 0) {
// Empty frame
reset();
} else if (frameStart == currentStart && frameEnd >= currentEnd) {
// Frame expansion
if (frameEnd != currentEnd) {
Partition region = partition.getRegion(currentEnd + 1, frameEnd);
aggregator.addInput(region);
currentEnd = frameEnd;
}
} else {
buildNewFrame(partition, frameStart, frameEnd);
}

aggregator.evaluate(builder);
}

private void buildNewFrame(Partition partition, int frameStart, int frameEnd) {
if (aggregator.removable()) {
int prefix = Math.abs(currentStart - frameStart);
int suffix = Math.abs(currentEnd - frameEnd);
int frameLength = frameEnd - frameStart + 1;

// Compare remove && add cost with re-computation
if (frameLength > prefix + suffix) {
if (currentStart < frameStart) {
Partition region = partition.getRegion(currentStart, frameStart - 1);
aggregator.removeInput(region);
} else if (currentStart > frameStart) {
Partition region = partition.getRegion(frameStart, currentStart - 1);
aggregator.addInput(region);
} // Do nothing when currentStart == frameStart

if (frameEnd < currentEnd) {
Partition region = partition.getRegion(frameEnd + 1, currentEnd);
aggregator.removeInput(region);
} else if (frameEnd > currentEnd) {
Partition region = partition.getRegion(currentEnd + 1, frameEnd);
aggregator.addInput(region);
} // Do nothing when frameEnd == currentEnd

currentStart = frameStart;
currentEnd = frameEnd;
return;
}
}

// Re-compute
aggregator.reset();
Partition region = partition.getRegion(frameStart, frameEnd);
aggregator.addInput(region);

currentStart = frameStart;
currentEnd = frameEnd;
}

@Override
public boolean needPeerGroup() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.aggregate;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAccumulator;

import com.google.common.primitives.Ints;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;

import java.util.List;

import static java.util.Objects.requireNonNull;
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;

public class WindowAggregator {
private final TableAccumulator accumulator;
private final TSDataType outputType;
private final int[] inputChannels;

public WindowAggregator(
TableAccumulator accumulator, TSDataType outputType, List<Integer> inputChannels) {
this.accumulator = requireNonNull(accumulator, "accumulator is null");
this.outputType = requireNonNull(outputType, "intermediateType is null");
this.inputChannels = Ints.toArray(requireNonNull(inputChannels, "inputChannels is null"));
}

public TSDataType getType() {
return outputType;
}

public void addInput(Partition partition) {
List<Column[]> allColumns = partition.getAllColumns();
for (Column[] columns : allColumns) {
addInput(columns);
}
}

public void addInput(Column[] columns) {
Column[] arguments = new Column[inputChannels.length];
for (int i = 0; i < inputChannels.length; i++) {
arguments[i] = columns[inputChannels[i]];
}

// Process count(*)
int count = columns[0].getPositionCount();
if (arguments.length == 0) {
arguments = new Column[] {new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, count)};
}

AggregationMask mask = AggregationMask.createSelectAll(count);
accumulator.addInput(arguments, mask);
}

public void removeInput(Partition partition) {
List<Column[]> allColumns = partition.getAllColumns();
for (Column[] columns : allColumns) {
removeInput(columns);
}
}

private void removeInput(Column[] columns) {
Column[] arguments = new Column[inputChannels.length];
for (int i = 0; i < inputChannels.length; i++) {
arguments[i] = columns[inputChannels[i]];
}

// Process count(*)
int count = columns[0].getPositionCount();
if (arguments.length == 0) {
arguments = new Column[] {new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, count)};
}

accumulator.removeInput(arguments);
}

public void evaluate(ColumnBuilder columnBuilder) {
accumulator.evaluateFinal(columnBuilder);
}

public void processStatistics(Statistics[] statistics) {
accumulator.addStatistics(statistics);
}

public boolean hasFinalResult() {
return accumulator.hasFinalResult();
}

public void reset() {
accumulator.reset();
}

public boolean removable() {
return accumulator.removable();
}

public long getEstimatedSize() {
return accumulator.getEstimatedSize();
}

public int getChannelCount() {
return this.inputChannels.length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;

import org.apache.tsfile.block.column.ColumnBuilder;

public class CumeDistFunction extends RankWindowFunction {
private long count;

public CumeDistFunction() {
reset();
}

@Override
public void reset() {
super.reset();
count = 0;
}

@Override
public void transform(
Partition partition,
ColumnBuilder builder,
int index,
boolean isNewPeerGroup,
int peerGroupCount) {
if (isNewPeerGroup) {
count += peerGroupCount;
}

builder.writeDouble(((double) count) / partition.getPositionCount());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank;

import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;

import org.apache.tsfile.block.column.ColumnBuilder;

public class DenseRankFunction extends RankWindowFunction {
private long rank;

public DenseRankFunction() {
reset();
}

@Override
public void reset() {
super.reset();
rank = 0;
}

@Override
public void transform(
Partition partition,
ColumnBuilder builder,
int index,
boolean isNewPeerGroup,
int peerGroupCount) {
if (isNewPeerGroup) {
rank++;
}

builder.writeLong(rank);
}
}
Loading

0 comments on commit f2d24ff

Please sign in to comment.