Skip to content

Commit

Permalink
[parquet] Parquet ColumnarBatch should return ColumnarRowIterator for…
Browse files Browse the repository at this point in the history
… nested schema
  • Loading branch information
JingsongLi committed Jan 24, 2025
1 parent a779c95 commit 8e00805
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,40 @@

package org.apache.paimon.format.parquet.newreader;

import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.MapColumnVector;
import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.fs.Path;

import java.util.Arrays;

/** A batch of rows in columnar format. */
public class ColumnarBatch {
protected final ColumnVector[] columns;

protected final VectorizedColumnBatch vectorizedColumnBatch;
protected final VectorizedRowIterator vectorizedRowIterator;
protected final ColumnarRowIterator vectorizedRowIterator;

public ColumnarBatch(Path filePath, ColumnVector[] columns) {
this.columns = columns;
this.vectorizedColumnBatch = new VectorizedColumnBatch(columns);
boolean containsNestedColumn =
Arrays.stream(columns)
.anyMatch(
vector ->
vector instanceof MapColumnVector
|| vector instanceof RowColumnVector
|| vector instanceof ArrayColumnVector);
ColumnarRow row = new ColumnarRow(vectorizedColumnBatch);
this.vectorizedRowIterator =
new VectorizedRowIterator(filePath, new ColumnarRow(vectorizedColumnBatch), null);
containsNestedColumn
? new ColumnarRowIterator(filePath, row, null)
: new VectorizedRowIterator(filePath, row, null);
}

/** Reset next record position and return self. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.BytesColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.format.FormatReaderContext;
Expand Down Expand Up @@ -54,7 +53,6 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -67,11 +65,6 @@ public class ParquetColumnVectorTest {
private @TempDir java.nio.file.Path tempDir;

private static final Random RND = ThreadLocalRandom.current();
private static final BiFunction<ColumnVector, Integer, String> BYTES_COLUMN_VECTOR_STRING_FUNC =
(cv, i) ->
cv.isNullAt(i)
? "null"
: new String(((BytesColumnVector) cv).getBytes(i).getBytes());

@Test
public void testNormalStrings() throws IOException {
Expand Down

0 comments on commit 8e00805

Please sign in to comment.