Skip to content

Commit

Permalink
[parquet] Introduce LongIterator to Parquet RowIndexGenerator (#4991)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 24, 2025
1 parent 8e00805 commit 3290fcc
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.LongIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;

Expand Down Expand Up @@ -51,17 +52,15 @@ public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable Runnable re
}

public void reset(long nextFilePos) {
long[] positions = new long[row.batch().getNumRows()];
for (int i = 0; i < row.batch().getNumRows(); i++) {
positions[i] = nextFilePos++;
}
reset(positions);
reset(LongIterator.fromRange(nextFilePos, nextFilePos + positions.length));
}

public void reset(long[] positions) {
assert positions.length == row.batch().getNumRows();
this.positions = positions;
public void reset(LongIterator positions) {
this.num = row.batch().getNumRows();
this.positions = new long[num];
for (int i = 0; i < num; i++) {
this.positions[i] = positions.next();
}
this.nextPos = 0;
}

Expand Down Expand Up @@ -92,7 +91,7 @@ public Path filePath() {
protected ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(positions);
newIterator.reset(LongIterator.fromArray(positions));
return newIterator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.utils.LongIterator;

import javax.annotation.Nullable;

Expand All @@ -39,7 +40,7 @@ public VectorizedColumnBatch batch() {
protected VectorizedRowIterator copy(ColumnVector[] vectors) {
VectorizedRowIterator newIterator =
new VectorizedRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(positions);
newIterator.reset(LongIterator.fromArray(positions));
return newIterator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.util.NoSuchElementException;

/** Iterator for long. */
public interface LongIterator {

boolean hasNext();

long next();

static LongIterator fromRange(final long startInclusive, final long endExclusive) {
return new LongIterator() {

private long i = startInclusive;

@Override
public boolean hasNext() {
return i < endExclusive;
}

@Override
public long next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return i++;
}
};
}

static LongIterator fromArray(final long[] longs) {
return new LongIterator() {

private int i = 0;

@Override
public boolean hasNext() {
return i < longs.length;
}

@Override
public long next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return longs[i++];
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

class LongIteratorTest {

@Test
public void testRange() {
LongIterator iterator = LongIterator.fromRange(5, 10);
List<Long> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add(iterator.next());
}
assertThat(list).containsExactlyInAnyOrder(5L, 6L, 7L, 8L, 9L);
}

@Test
public void testFromArray() {
long[] array = new long[] {5L, 6L, 7L, 8L, 9L};
LongIterator iterator = LongIterator.fromArray(array);
List<Long> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add(iterator.next());
}
assertThat(list).containsExactlyInAnyOrder(5L, 6L, 7L, 8L, 9L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.LongIterator;

import java.util.Arrays;

Expand Down Expand Up @@ -55,7 +56,7 @@ public ColumnarBatch(Path filePath, ColumnVector[] columns) {
}

/** Reset next record position and return self. */
public void resetPositions(long[] positions) {
public void resetPositions(LongIterator positions) {
vectorizedRowIterator.reset(positions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.paimon.format.parquet.newreader;

import org.apache.paimon.utils.LongIterator;

import org.apache.parquet.column.page.PageReadStore;

import java.util.Iterator;
import java.util.stream.Stream;
import java.util.PrimitiveIterator;

/* This file is based on source code from the Spark Project (http://spark.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand All @@ -30,37 +31,32 @@
/** Generate row index for columnar batch. */
public class RowIndexGenerator {

Iterator<Long> rowIndexIterator;
private LongIterator rowIndexIterator;

public void initFromPageReadStore(PageReadStore pageReadStore) {
long startingRowIdx = pageReadStore.getRowIndexOffset().orElse(0L);

if (pageReadStore.getRowIndexes().isPresent()) {
final Iterator<Long> rowIndexes = pageReadStore.getRowIndexes().get();
PrimitiveIterator.OfLong rowIndexes = pageReadStore.getRowIndexes().orElse(null);
if (rowIndexes != null) {
rowIndexIterator =
new Iterator<Long>() {
new LongIterator() {
@Override
public boolean hasNext() {
return rowIndexes.hasNext();
}

@Override
public Long next() {
return rowIndexes.next() + startingRowIdx;
public long next() {
return rowIndexes.nextLong() + startingRowIdx;
}
};
} else {
long numRowsInRowGroup = pageReadStore.getRowCount();
rowIndexIterator =
Stream.iterate(startingRowIdx, i -> i + 1).limit(numRowsInRowGroup).iterator();
LongIterator.fromRange(startingRowIdx, startingRowIdx + numRowsInRowGroup);
}
}

public void populateRowIndex(ColumnarBatch columnarBatch, int numRows) {
long[] rowIndexes = new long[numRows];
for (int i = 0; i < numRows; i++) {
rowIndexes[i] = rowIndexIterator.next();
}
columnarBatch.resetPositions(rowIndexes);
public void populateRowIndex(ColumnarBatch columnarBatch) {
columnarBatch.resetPositions(rowIndexIterator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public boolean nextBatch() throws IOException {
}
rowsReturned += num;
columnarBatch.setNumRows(num);
rowIndexGenerator.populateRowIndex(columnarBatch, num);
rowIndexGenerator.populateRowIndex(columnarBatch);
return true;
}

Expand Down

0 comments on commit 3290fcc

Please sign in to comment.