diff --git a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java index 8e12d56dcdee..cc7c6dc3014c 100644 --- a/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java +++ b/paimon-arrow/src/main/java/org/apache/paimon/arrow/converter/ArrowVectorizedBatchConverter.java @@ -32,6 +32,8 @@ import javax.annotation.Nullable; +import java.io.IOException; + /** To convert {@link VectorizedColumnBatch} to Arrow format. */ public class ArrowVectorizedBatchConverter extends ArrowBatchConverter { @@ -65,22 +67,27 @@ public void reset(ApplyDeletionFileRecordIterator iterator) { FileRecordIterator innerIterator = iterator.iterator(); this.batch = ((VectorizedRecordIterator) innerIterator).batch(); - long firstReturnedPosition = innerIterator.returnedPosition() + 1; - DeletionVector deletionVector = iterator.deletionVector(); - int originNumRows = this.batch.getNumRows(); - IntArrayList picked = new IntArrayList(originNumRows); - for (int i = 0; i < originNumRows; i++) { - long returnedPosition = firstReturnedPosition + i; - if (!deletionVector.isDeleted(returnedPosition)) { - picked.add(i); + try { + DeletionVector deletionVector = iterator.deletionVector(); + int originNumRows = this.batch.getNumRows(); + IntArrayList picked = new IntArrayList(originNumRows); + for (int i = 0; i < originNumRows; i++) { + innerIterator.next(); + long returnedPosition = innerIterator.returnedPosition(); + if (!deletionVector.isDeleted(returnedPosition)) { + picked.add(i); + } } - } - if (picked.size() == originNumRows) { - this.pickedInColumn = null; - this.totalNumRows = originNumRows; - } else { - this.pickedInColumn = picked.toArray(); - this.totalNumRows = this.pickedInColumn.length; + + if (picked.size() == originNumRows) { + this.pickedInColumn = null; + this.totalNumRows = originNumRows; + } else { + this.pickedInColumn = picked.toArray(); + this.totalNumRows = this.pickedInColumn.length; + } + } catch (IOException e) { + throw new RuntimeException("Failed to apply deletion vector.", e); } this.startIndex = 0; diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index c1ed028acdbe..02bfe5912d3f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -29,6 +29,8 @@ import javax.annotation.Nullable; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** * A {@link RecordReader.RecordIterator} that returns {@link InternalRow}s. The next row is set by * {@link ColumnarRow#setRowId}. @@ -41,8 +43,10 @@ public class ColumnarRowIterator extends RecyclableIterator protected final Runnable recycler; protected int num; - protected int nextPos; - protected long[] positions; + protected int index; + protected int returnedPositionIndex; + protected long returnedPosition; + protected LongIterator positionIterator; public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable recycler) { super(recycler); @@ -56,19 +60,18 @@ public void reset(long nextFilePos) { } public void reset(LongIterator positions) { + this.positionIterator = positions; this.num = row.batch().getNumRows(); - this.positions = new long[num]; - for (int i = 0; i < num; i++) { - this.positions[i] = positions.next(); - } - this.nextPos = 0; + this.index = 0; + this.returnedPositionIndex = 0; + this.returnedPosition = -1; } @Nullable @Override public InternalRow next() { - if (nextPos < num) { - row.setRowId(nextPos++); + if (index < num) { + row.setRowId(index++); return row; } else { return null; @@ -77,10 +80,15 @@ public InternalRow next() { @Override public long returnedPosition() { - if (nextPos == 0) { - return positions[0] - 1; + for (int i = 0; i < index - returnedPositionIndex; i++) { + returnedPosition = positionIterator.next(); } - return positions[nextPos - 1]; + returnedPositionIndex = index; + if (returnedPosition == -1) { + throw new IllegalStateException("returnedPosition() is called before next()"); + } + + return returnedPosition; } @Override @@ -89,9 +97,11 @@ public Path filePath() { } protected ColumnarRowIterator copy(ColumnVector[] vectors) { + // We should call copy only when the iterator is at the beginning of the file. + checkArgument(returnedPositionIndex == 0, "copy() should not be called after next()"); ColumnarRowIterator newIterator = new ColumnarRowIterator(filePath, row.copy(vectors), recycler); - newIterator.reset(LongIterator.fromArray(positions)); + newIterator.reset(positionIterator); return newIterator; } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java index fa9b1ded8412..dfec00a39b72 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java @@ -20,10 +20,11 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.VectorizedRecordIterator; -import org.apache.paimon.utils.LongIterator; import javax.annotation.Nullable; +import static org.apache.paimon.utils.Preconditions.checkArgument; + /** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */ public class VectorizedRowIterator extends ColumnarRowIterator implements VectorizedRecordIterator { @@ -38,9 +39,10 @@ public VectorizedColumnBatch batch() { @Override protected VectorizedRowIterator copy(ColumnVector[] vectors) { + checkArgument(returnedPositionIndex == 0, "copy() should not be called after next()"); VectorizedRowIterator newIterator = new VectorizedRowIterator(filePath, row.copy(vectors), recycler); - newIterator.reset(LongIterator.fromArray(positions)); + newIterator.reset(positionIterator); return newIterator; } } diff --git a/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java new file mode 100644 index 000000000000..8ab926a193cd --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/columnar/ColumnarRowIteratorTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.columnar; + +import org.apache.paimon.data.columnar.heap.HeapIntVector; +import org.apache.paimon.fs.Path; +import org.apache.paimon.utils.LongIterator; + +import org.junit.jupiter.api.Test; + +import java.util.Random; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +/** Test for {@link ColumnarRowIterator}. */ +public class ColumnarRowIteratorTest { + + @Test + public void testRowIterator() { + Random random = new Random(); + HeapIntVector heapIntVector = new HeapIntVector(100); + for (int i = 0; i < 100; i++) { + heapIntVector.setInt(i, random.nextInt()); + } + long[] positions = new long[100]; + positions[0] = random.nextInt(10); + for (int i = 1; i < 100; i++) { + positions[i] = positions[i - 1] + random.nextInt(100); + } + + VectorizedColumnBatch vectorizedColumnBatch = + new VectorizedColumnBatch(new ColumnVector[] {heapIntVector}); + vectorizedColumnBatch.setNumRows(100); + ColumnarRowIterator rowIterator = + new ColumnarRowIterator( + new Path("test"), new ColumnarRow(vectorizedColumnBatch), null); + rowIterator.reset(LongIterator.fromArray(positions)); + assertThatCode(rowIterator::returnedPosition) + .hasMessage("returnedPosition() is called before next()"); + rowIterator.next(); + for (int i = 0; i < random.nextInt(10); i++) { + for (int j = 0; j < random.nextInt(9); j++) { + rowIterator.next(); + } + assertThat(rowIterator.returnedPosition()).isEqualTo(positions[rowIterator.index - 1]); + } + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java similarity index 90% rename from paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java rename to paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java index bc7c127a634d..97b00f590bf0 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/calumnar/heap/RowColumnVectorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/RowColumnVectorTest.java @@ -16,13 +16,10 @@ * limitations under the License. */ -package org.apache.paimon.data.calumnar.heap; +package org.apache.paimon.data.columnar.heap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.columnar.ColumnVector; -import org.apache.paimon.data.columnar.heap.CastedRowColumnVector; -import org.apache.paimon.data.columnar.heap.HeapIntVector; -import org.apache.paimon.data.columnar.heap.HeapRowVector; import org.junit.jupiter.api.Test;