Skip to content

Commit

Permalink
[core] Refactory ColumnarRowIterator using LongIterator. (#4992)
Browse files Browse the repository at this point in the history
  • Loading branch information
leaves12138 authored and JingsongLi committed Jan 24, 2025
1 parent 9d2d214 commit 73c3097
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

import javax.annotation.Nullable;

import java.io.IOException;

/** To convert {@link VectorizedColumnBatch} to Arrow format. */
public class ArrowVectorizedBatchConverter extends ArrowBatchConverter {

Expand Down Expand Up @@ -65,22 +67,27 @@ public void reset(ApplyDeletionFileRecordIterator iterator) {
FileRecordIterator<InternalRow> innerIterator = iterator.iterator();
this.batch = ((VectorizedRecordIterator) innerIterator).batch();

long firstReturnedPosition = innerIterator.returnedPosition() + 1;
DeletionVector deletionVector = iterator.deletionVector();
int originNumRows = this.batch.getNumRows();
IntArrayList picked = new IntArrayList(originNumRows);
for (int i = 0; i < originNumRows; i++) {
long returnedPosition = firstReturnedPosition + i;
if (!deletionVector.isDeleted(returnedPosition)) {
picked.add(i);
try {
DeletionVector deletionVector = iterator.deletionVector();
int originNumRows = this.batch.getNumRows();
IntArrayList picked = new IntArrayList(originNumRows);
for (int i = 0; i < originNumRows; i++) {
innerIterator.next();
long returnedPosition = innerIterator.returnedPosition();
if (!deletionVector.isDeleted(returnedPosition)) {
picked.add(i);
}
}
}
if (picked.size() == originNumRows) {
this.pickedInColumn = null;
this.totalNumRows = originNumRows;
} else {
this.pickedInColumn = picked.toArray();
this.totalNumRows = this.pickedInColumn.length;

if (picked.size() == originNumRows) {
this.pickedInColumn = null;
this.totalNumRows = originNumRows;
} else {
this.pickedInColumn = picked.toArray();
this.totalNumRows = this.pickedInColumn.length;
}
} catch (IOException e) {
throw new RuntimeException("Failed to apply deletion vector.", e);
}

this.startIndex = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import javax.annotation.Nullable;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s. The next row is set by
* {@link ColumnarRow#setRowId}.
Expand All @@ -41,8 +43,10 @@ public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
protected final Runnable recycler;

protected int num;
protected int nextPos;
protected long[] positions;
protected int index;
protected int returnedPositionIndex;
protected long returnedPosition;
protected LongIterator positionIterator;

public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) {
super(recycler);
Expand All @@ -56,19 +60,18 @@ public void reset(long nextFilePos) {
}

public void reset(LongIterator positions) {
this.positionIterator = positions;
this.num = row.batch().getNumRows();
this.positions = new long[num];
for (int i = 0; i < num; i++) {
this.positions[i] = positions.next();
}
this.nextPos = 0;
this.index = 0;
this.returnedPositionIndex = 0;
this.returnedPosition = -1;
}

@Nullable
@Override
public InternalRow next() {
if (nextPos < num) {
row.setRowId(nextPos++);
if (index < num) {
row.setRowId(index++);
return row;
} else {
return null;
Expand All @@ -77,10 +80,15 @@ public InternalRow next() {

@Override
public long returnedPosition() {
if (nextPos == 0) {
return positions[0] - 1;
for (int i = 0; i < index - returnedPositionIndex; i++) {
returnedPosition = positionIterator.next();
}
return positions[nextPos - 1];
returnedPositionIndex = index;
if (returnedPosition == -1) {
throw new IllegalStateException("returnedPosition() is called before next()");
}

return returnedPosition;
}

@Override
Expand All @@ -89,9 +97,11 @@ public Path filePath() {
}

protected ColumnarRowIterator copy(ColumnVector[] vectors) {
// We should call copy only when the iterator is at the beginning of the file.
checkArgument(returnedPositionIndex == 0, "copy() should not be called after next()");
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(LongIterator.fromArray(positions));
newIterator.reset(positionIterator);
return newIterator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.utils.LongIterator;

import javax.annotation.Nullable;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */
public class VectorizedRowIterator extends ColumnarRowIterator implements VectorizedRecordIterator {

Expand All @@ -38,9 +39,10 @@ public VectorizedColumnBatch batch() {

@Override
protected VectorizedRowIterator copy(ColumnVector[] vectors) {
checkArgument(returnedPositionIndex == 0, "copy() should not be called after next()");
VectorizedRowIterator newIterator =
new VectorizedRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(LongIterator.fromArray(positions));
newIterator.reset(positionIterator);
return newIterator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data.columnar;

import org.apache.paimon.data.columnar.heap.HeapIntVector;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.LongIterator;

import org.junit.jupiter.api.Test;

import java.util.Random;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;

/** Test for {@link ColumnarRowIterator}. */
public class ColumnarRowIteratorTest {

@Test
public void testRowIterator() {
Random random = new Random();
HeapIntVector heapIntVector = new HeapIntVector(100);
for (int i = 0; i < 100; i++) {
heapIntVector.setInt(i, random.nextInt());
}
long[] positions = new long[100];
positions[0] = random.nextInt(10);
for (int i = 1; i < 100; i++) {
positions[i] = positions[i - 1] + random.nextInt(100);
}

VectorizedColumnBatch vectorizedColumnBatch =
new VectorizedColumnBatch(new ColumnVector[] {heapIntVector});
vectorizedColumnBatch.setNumRows(100);
ColumnarRowIterator rowIterator =
new ColumnarRowIterator(
new Path("test"), new ColumnarRow(vectorizedColumnBatch), null);
rowIterator.reset(LongIterator.fromArray(positions));
assertThatCode(rowIterator::returnedPosition)
.hasMessage("returnedPosition() is called before next()");
rowIterator.next();
for (int i = 0; i < random.nextInt(10); i++) {
for (int j = 0; j < random.nextInt(9); j++) {
rowIterator.next();
}
assertThat(rowIterator.returnedPosition()).isEqualTo(positions[rowIterator.index - 1]);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
* limitations under the License.
*/

package org.apache.paimon.data.calumnar.heap;
package org.apache.paimon.data.columnar.heap;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.heap.CastedRowColumnVector;
import org.apache.paimon.data.columnar.heap.HeapIntVector;
import org.apache.paimon.data.columnar.heap.HeapRowVector;

import org.junit.jupiter.api.Test;

Expand Down

0 comments on commit 73c3097

Please sign in to comment.