From aae1b665156de30608f7ed23857fd83da3142872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Thu, 19 Sep 2024 16:32:38 +0800 Subject: [PATCH] fix comment --- .../paimon/codegen/SimpleProjection.java | 243 ++++++++++++++++++ .../paimon/table/system/FilesTable.java | 95 ++++--- 2 files changed, 287 insertions(+), 51 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/codegen/SimpleProjection.java diff --git a/paimon-core/src/main/java/org/apache/paimon/codegen/SimpleProjection.java b/paimon-core/src/main/java/org/apache/paimon/codegen/SimpleProjection.java new file mode 100644 index 000000000000..b0f48d5dda67 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/codegen/SimpleProjection.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.codegen; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.InternalArraySerializer; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BinaryType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypeVisitor; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.MultisetType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TimeType; +import org.apache.paimon.types.TimestampType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarBinaryType; +import org.apache.paimon.types.VarCharType; + +import java.util.ArrayList; +import java.util.List; + +/** Projection without codegen. */ +public class SimpleProjection implements Projection { + + private static final FieldWriterVisitor FIELD_WRITER_VISITOR = new FieldWriterVisitor(); + + private final BinaryRow binaryRow; + private final BinaryRowWriter binaryRowWriter; + private final Writer[] writers; + + public SimpleProjection(RowType all, List projectedFields) { + + this.binaryRow = new BinaryRow(projectedFields.size()); + this.binaryRowWriter = new BinaryRowWriter(binaryRow); + + List writers = new ArrayList<>(); + List fields = all.getFields(); + + for (int i = 0; i < all.getFieldCount(); i++) { + int index = projectedFields.indexOf(fields.get(i).name()); + if (index != -1) { + writers.add( + new Writer( + i, + index, + binaryRowWriter, + fields.get(i).type().accept(FIELD_WRITER_VISITOR))); + } + } + + this.writers = writers.toArray(new Writer[0]); + } + + @Override + public BinaryRow apply(InternalRow internalRow) { + + binaryRowWriter.reset(); + for (Writer writer : writers) { + writer.write(internalRow); + } + binaryRowWriter.complete(); + return binaryRow.copy(); + } + + /** Writer binary row. */ + public static class Writer { + + private final int i; + private final int j; + private final BinaryRowWriter binaryRowWriter; + private final FieldWriter fieldWriter; + + public Writer(int i, int j, BinaryRowWriter binaryRowWriter, FieldWriter fieldWriter) { + this.i = i; + this.j = j; + + this.binaryRowWriter = binaryRowWriter; + this.fieldWriter = fieldWriter; + } + + public void write(InternalRow row) { + if (row.isNullAt(i)) { + binaryRowWriter.setNullAt(i); + } else { + fieldWriter.write(row, i, binaryRowWriter, j); + } + } + } + + /** Write fields. */ + public interface FieldWriter { + void write(InternalRow internalRow, int i, BinaryRowWriter binaryRowWriter, int j); + } + + /** Visitor. */ + public static class FieldWriterVisitor implements DataTypeVisitor { + + @Override + public FieldWriter visit(CharType charType) { + return (row, i, writer, j) -> writer.writeString(j, row.getString(i)); + } + + @Override + public FieldWriter visit(VarCharType varCharType) { + return (row, i, writer, j) -> writer.writeString(j, row.getString(i)); + } + + @Override + public FieldWriter visit(BooleanType booleanType) { + return (row, i, writer, j) -> writer.writeBoolean(j, row.getBoolean(i)); + } + + @Override + public FieldWriter visit(BinaryType binaryType) { + return (row, i, writer, j) -> writer.writeBinary(j, row.getBinary(i)); + } + + @Override + public FieldWriter visit(VarBinaryType varBinaryType) { + return (row, i, writer, j) -> writer.writeBinary(j, row.getBinary(i)); + } + + @Override + public FieldWriter visit(DecimalType decimalType) { + return (row, i, writer, j) -> + writer.writeDecimal( + j, + row.getDecimal(i, decimalType.getPrecision(), decimalType.getScale()), + decimalType.getPrecision()); + } + + @Override + public FieldWriter visit(TinyIntType tinyIntType) { + return (row, i, writer, j) -> writer.writeByte(j, row.getByte(i)); + } + + @Override + public FieldWriter visit(SmallIntType smallIntType) { + return (row, i, writer, j) -> writer.writeShort(j, row.getShort(i)); + } + + @Override + public FieldWriter visit(IntType intType) { + return (row, i, writer, j) -> writer.writeInt(j, row.getInt(i)); + } + + @Override + public FieldWriter visit(BigIntType bigIntType) { + return (row, i, writer, j) -> writer.writeLong(j, row.getLong(i)); + } + + @Override + public FieldWriter visit(FloatType floatType) { + return (row, i, writer, j) -> writer.writeFloat(j, row.getFloat(i)); + } + + @Override + public FieldWriter visit(DoubleType doubleType) { + return (row, i, writer, j) -> writer.writeDouble(j, row.getDouble(i)); + } + + @Override + public FieldWriter visit(DateType dateType) { + return (row, i, writer, j) -> writer.writeInt(j, row.getInt(i)); + } + + @Override + public FieldWriter visit(TimeType timeType) { + return (row, i, writer, j) -> writer.writeInt(j, row.getInt(i)); + } + + @Override + public FieldWriter visit(TimestampType timestampType) { + return (row, i, writer, j) -> + writer.writeTimestamp( + j, + row.getTimestamp(i, timestampType.getPrecision()), + timestampType.getPrecision()); + } + + @Override + public FieldWriter visit(LocalZonedTimestampType localZonedTimestampType) { + return (row, i, writer, j) -> + writer.writeTimestamp( + j, + row.getTimestamp(i, localZonedTimestampType.getPrecision()), + localZonedTimestampType.getPrecision()); + } + + @Override + public FieldWriter visit(ArrayType arrayType) { + return (row, i, writer, j) -> + writer.writeArray( + j, + row.getArray(i), + new InternalArraySerializer(arrayType.getElementType())); + } + + @Override + public FieldWriter visit(MultisetType multisetType) { + throw new UnsupportedOperationException(); + } + + @Override + public FieldWriter visit(MapType mapType) { + throw new UnsupportedOperationException(); + } + + @Override + public FieldWriter visit(RowType rowType) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 7d61f5d5641f..b44926ff9a22 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.system; +import org.apache.paimon.codegen.SimpleProjection; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; @@ -59,6 +60,7 @@ import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SerializationUtils; +import org.apache.paimon.utils.TypeUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; @@ -69,7 +71,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -136,7 +137,7 @@ public List primaryKeys() { @Override public InnerTableScan newScan() { - return new FilesScan(storeTable.newScan().listPartitions()); + return new FilesScan(storeTable); } @Override @@ -156,10 +157,10 @@ private static class FilesScan extends ReadOnceTableScan { @Nullable private LeafPredicate bucketPredicate; @Nullable private LeafPredicate levelPredicate; - private final List partitions; + private final FileStoreTable fileStoreTable; - public FilesScan(List partitions) { - this.partitions = partitions; + public FilesScan(FileStoreTable fileStoreTable) { + this.fileStoreTable = fileStoreTable; } @Override @@ -178,43 +179,56 @@ public InnerTableScan withFilter(Predicate pushdown) { @Override public Plan innerPlan() { + List partitions = new ArrayList<>(); + if (partitionPredicate != null && partitionPredicate.function() instanceof Equal) { + GenericRow partitionRow = new GenericRow(fileStoreTable.partitionKeys().size()); + RowType partitionRowType = fileStoreTable.schema().logicalPartitionType(); + String partitionStr = partitionPredicate.literals().get(0).toString(); + if (partitionStr.startsWith("[")) { + partitionStr = partitionStr.substring(1); + } + if (partitionStr.endsWith("]")) { + partitionStr = partitionStr.substring(0, partitionStr.length() - 1); + } + String[] partFields = partitionStr.split(", "); + List partitionKeys = fileStoreTable.partitionKeys(); + if (partitionKeys.size() != partFields.length) { + return Collections::emptyList; + } + for (int i = 0; i < partitionKeys.size(); i++) { + partitionRow.setField( + i, + TypeUtils.castFromString(partFields[i], partitionRowType.getTypeAt(i))); + } - if (partitionPredicate != null) { - return () -> - Collections.singletonList( - new FilesSplit( - partitionPredicate, bucketPredicate, levelPredicate, null)); + partitions.add( + new SimpleProjection(partitionRowType, fileStoreTable.partitionKeys()) + .apply(partitionRow)); + // TODO support range? } else { - return () -> - partitions.stream() - .map( - p -> - new FilesSplit( - partitionPredicate, - bucketPredicate, - levelPredicate, - p)) - .collect(Collectors.toList()); + partitions.addAll(fileStoreTable.newScan().listPartitions()); } + + return () -> + partitions.stream() + .map(p -> new FilesSplit(p, bucketPredicate, levelPredicate)) + .collect(Collectors.toList()); } } private static class FilesSplit extends SingletonSplit { - @Nullable private final LeafPredicate partitionPredicate; + @Nullable private final BinaryRow partition; @Nullable private final LeafPredicate bucketPredicate; @Nullable private final LeafPredicate levelPredicate; - @Nullable private final BinaryRow partition; private FilesSplit( - @Nullable LeafPredicate partitionPredicate, + @Nullable BinaryRow partition, @Nullable LeafPredicate bucketPredicate, - @Nullable LeafPredicate levelPredicate, - @Nullable BinaryRow partition) { - this.partitionPredicate = partitionPredicate; + @Nullable LeafPredicate levelPredicate) { + this.partition = partition; this.bucketPredicate = bucketPredicate; this.levelPredicate = levelPredicate; - this.partition = partition; } @Override @@ -226,14 +240,14 @@ public boolean equals(Object o) { return false; } FilesSplit that = (FilesSplit) o; - return Objects.equals(partitionPredicate, that.partitionPredicate) + return Objects.equals(partition, that.partition) && Objects.equals(bucketPredicate, that.bucketPredicate) && Objects.equals(this.levelPredicate, that.levelPredicate); } @Override public int hashCode() { - return Objects.hash(partitionPredicate, bucketPredicate, levelPredicate); + return Objects.hash(partition, bucketPredicate, levelPredicate); } public List splits(FileStoreTable storeTable) { @@ -242,28 +256,7 @@ public List splits(FileStoreTable storeTable) { private TableScan.Plan tablePlan(FileStoreTable storeTable) { InnerTableScan scan = storeTable.newScan(); - if (partitionPredicate != null) { - if (partitionPredicate.function() instanceof Equal) { - String partitionStr = partitionPredicate.literals().get(0).toString(); - if (partitionStr.startsWith("[")) { - partitionStr = partitionStr.substring(1); - } - if (partitionStr.endsWith("]")) { - partitionStr = partitionStr.substring(0, partitionStr.length() - 1); - } - String[] partFields = partitionStr.split(", "); - LinkedHashMap partSpec = new LinkedHashMap<>(); - List partitionKeys = storeTable.partitionKeys(); - if (partitionKeys.size() != partFields.length) { - return Collections::emptyList; - } - for (int i = 0; i < partitionKeys.size(); i++) { - partSpec.put(partitionKeys.get(i), partFields[i]); - } - scan.withPartitionFilter(partSpec); - } - // TODO support range? - } else if (partition != null) { + if (partition != null) { scan.withPartitionFilter(Collections.singletonList(partition)); } if (bucketPredicate != null) {