Skip to content

Commit

Permalink
[core] Introduce conversion from parquet type to paimon type (#4888)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Jan 13, 2025
1 parent b35bb3a commit 97441c3
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public FormatWriterFactory createWriterFactory(RowType type) {

@Override
public void validateDataFields(RowType rowType) {
ParquetSchemaConverter.convertToParquetMessageType("paimon_schema", rowType);
ParquetSchemaConverter.convertToParquetMessageType(rowType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Pool;

import org.apache.parquet.ParquetReadOptions;
Expand Down Expand Up @@ -70,11 +71,10 @@
import java.util.List;
import java.util.Set;

import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_ELEMENT_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_REPEATED_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.PAIMON_SCHEMA;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.buildFieldsList;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createColumnReader;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createWritableColumnVector;
Expand All @@ -91,20 +91,15 @@ public class ParquetReaderFactory implements FormatReaderFactory {
private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";

private final Options conf;

private final RowType projectedType;
private final String[] projectedColumnNames;
private final DataField[] projectedFields;
private final DataField[] readFields;
private final int batchSize;
private final FilterCompat.Filter filter;
private final Set<Integer> unknownFieldsIndices = new HashSet<>();

public ParquetReaderFactory(
Options conf, RowType projectedType, int batchSize, FilterCompat.Filter filter) {
Options conf, RowType readType, int batchSize, FilterCompat.Filter filter) {
this.conf = conf;
this.projectedType = projectedType;
this.projectedColumnNames = projectedType.getFieldNames().toArray(new String[0]);
this.projectedFields = projectedType.getFields().toArray(new DataField[0]);
this.readFields = readType.getFields().toArray(new DataField[0]);
this.batchSize = batchSize;
this.filter = filter;
}
Expand All @@ -131,8 +126,7 @@ public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context co
createPoolOfBatches(context.filePath(), requestedSchema);

MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema);
List<ParquetField> fields =
buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO);
List<ParquetField> fields = buildFieldsList(readFields, columnIO);

return new ParquetReader(
reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
Expand Down Expand Up @@ -160,24 +154,23 @@ private void setReadOptions(ParquetReadOptions.Builder builder) {

/** Clips `parquetSchema` according to `fieldNames`. */
private MessageType clipParquetSchema(GroupType parquetSchema) {
Type[] types = new Type[projectedColumnNames.length];
for (int i = 0; i < projectedColumnNames.length; ++i) {
String fieldName = projectedColumnNames[i];
Type[] types = new Type[readFields.length];
for (int i = 0; i < readFields.length; ++i) {
String fieldName = readFields[i].name();
if (!parquetSchema.containsField(fieldName)) {
LOG.warn(
"{} does not exist in {}, will fill the field with null.",
fieldName,
parquetSchema);
types[i] =
ParquetSchemaConverter.convertToParquetType(fieldName, projectedFields[i]);
types[i] = ParquetSchemaConverter.convertToParquetType(readFields[i]);
unknownFieldsIndices.add(i);
} else {
Type parquetType = parquetSchema.getType(fieldName);
types[i] = clipParquetType(projectedFields[i].type(), parquetType);
types[i] = clipParquetType(readFields[i].type(), parquetType);
}
}

return Types.buildMessage().addFields(types).named("paimon-parquet");
return Types.buildMessage().addFields(types).named(PAIMON_SCHEMA);
}

/** Clips `parquetType` by `readType`. */
Expand All @@ -201,33 +194,29 @@ private Type clipParquetType(DataType readType, Type parquetType) {
case MAP:
MapType mapType = (MapType) readType;
GroupType mapGroup = (GroupType) parquetType;
GroupType keyValue = mapGroup.getType(MAP_REPEATED_NAME).asGroupType();
Pair<Type, Type> keyValueType = parquetMapKeyValueType(mapGroup);
return ConversionPatterns.mapType(
mapGroup.getRepetition(),
mapGroup.getName(),
MAP_REPEATED_NAME,
clipParquetType(mapType.getKeyType(), keyValue.getType(MAP_KEY_NAME)),
keyValue.containsField(MAP_VALUE_NAME)
? clipParquetType(
mapType.getValueType(), keyValue.getType(MAP_VALUE_NAME))
: null);
clipParquetType(mapType.getKeyType(), keyValueType.getLeft()),
clipParquetType(mapType.getValueType(), keyValueType.getRight()));
case ARRAY:
ArrayType arrayType = (ArrayType) readType;
GroupType arrayGroup = (GroupType) parquetType;
GroupType list = arrayGroup.getType(LIST_NAME).asGroupType();
return ConversionPatterns.listOfElements(
arrayGroup.getRepetition(),
arrayGroup.getName(),
clipParquetType(
arrayType.getElementType(), list.getType(LIST_ELEMENT_NAME)));
arrayType.getElementType(), parquetListElementType(arrayGroup)));
default:
return parquetType;
}
}

private void checkSchema(MessageType fileSchema, MessageType requestedSchema)
throws IOException, UnsupportedOperationException {
if (projectedColumnNames.length != requestedSchema.getFieldCount()) {
if (readFields.length != requestedSchema.getFieldCount()) {
throw new RuntimeException(
"The quality of field type is incompatible with the request schema!");
}
Expand Down Expand Up @@ -275,13 +264,13 @@ private ParquetReaderBatch createReaderBatch(
}

private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
WritableColumnVector[] columns = new WritableColumnVector[projectedFields.length];
WritableColumnVector[] columns = new WritableColumnVector[readFields.length];
List<Type> types = requestedSchema.getFields();
for (int i = 0; i < projectedFields.length; i++) {
for (int i = 0; i < readFields.length; i++) {
columns[i] =
createWritableColumnVector(
batchSize,
projectedFields[i].type(),
readFields[i].type(),
types.get(i),
requestedSchema.getColumns(),
0);
Expand All @@ -297,7 +286,7 @@ private VectorizedColumnBatch createVectorizedColumnBatch(
WritableColumnVector[] writableVectors) {
ColumnVector[] vectors = new ColumnVector[writableVectors.length];
for (int i = 0; i < writableVectors.length; i++) {
switch (projectedFields[i].type().getTypeRoot()) {
switch (readFields[i].type().getTypeRoot()) {
case DECIMAL:
vectors[i] =
new ParquetDecimalVector(
Expand Down Expand Up @@ -436,7 +425,7 @@ private void readNextRowGroup() throws IOException {
if (!unknownFieldsIndices.contains(i)) {
columnReaders[i] =
createColumnReader(
projectedFields[i].type(),
readFields[i].type(),
types.get(i),
requestedSchema.getColumns(),
rowGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Pair;

import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
Expand All @@ -39,33 +41,40 @@
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

/** Schema converter converts Parquet schema to and from Paimon internal types. */
public class ParquetSchemaConverter {

static final String PAIMON_SCHEMA = "paimon_schema";

static final String MAP_REPEATED_NAME = "key_value";
static final String MAP_KEY_NAME = "key";
static final String MAP_VALUE_NAME = "value";
static final String LIST_NAME = "list";
static final String LIST_REPEATED_NAME = "list";
static final String LIST_ELEMENT_NAME = "element";

public static MessageType convertToParquetMessageType(String name, RowType rowType) {
return new MessageType(name, convertToParquetTypes(rowType));
}

public static Type convertToParquetType(String name, DataField field) {
return convertToParquetType(name, field.type(), field.id(), 0);
/** Convert paimon {@link RowType} to parquet {@link MessageType}. */
public static MessageType convertToParquetMessageType(RowType rowType) {
return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType));
}

private static Type[] convertToParquetTypes(RowType rowType) {
return rowType.getFields().stream()
.map(f -> convertToParquetType(f.name(), f.type(), f.id(), 0))
.map(ParquetSchemaConverter::convertToParquetType)
.toArray(Type[]::new);
}

/** Convert paimon {@link DataField} to parquet {@link Type}. */
public static Type convertToParquetType(DataField field) {
return convertToParquetType(field.name(), field.type(), field.id(), 0);
}

private static Type convertToParquetType(String name, DataType type, int fieldId, int depth) {
Type.Repetition repetition =
type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED;
Expand Down Expand Up @@ -260,4 +269,144 @@ public static boolean is32BitDecimal(int precision) {
public static boolean is64BitDecimal(int precision) {
return precision <= 18 && precision > 9;
}

/** Convert parquet {@link MessageType} to paimon {@link RowType}. */
public static RowType convertToPaimonRowType(MessageType messageType) {
List<DataField> dataFields =
messageType.asGroupType().getFields().stream()
.map(ParquetSchemaConverter::convertToPaimonField)
.collect(Collectors.toList());
return new RowType(dataFields);
}

/** Convert parquet {@link Type} to paimon {@link DataField} to. */
public static DataField convertToPaimonField(Type parquetType) {
LogicalTypeAnnotation logicalType = parquetType.getLogicalTypeAnnotation();
DataType paimonDataType;

if (parquetType.isPrimitive()) {
switch (parquetType.asPrimitiveType().getPrimitiveTypeName()) {
case BINARY:
if (logicalType instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
paimonDataType = DataTypes.STRING();
} else {
paimonDataType = DataTypes.BYTES();
}
break;
case BOOLEAN:
paimonDataType = DataTypes.BOOLEAN();
break;
case FLOAT:
paimonDataType = DataTypes.FLOAT();
break;
case DOUBLE:
paimonDataType = DataTypes.DOUBLE();
break;
case INT32:
if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
} else if (logicalType
instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
int bitWidth = intType.getBitWidth();
if (bitWidth == 8) {
paimonDataType = DataTypes.TINYINT();
} else if (bitWidth == 16) {
paimonDataType = DataTypes.SMALLINT();
} else {
paimonDataType = DataTypes.INT();
}
} else if (logicalType
instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
paimonDataType = DataTypes.DATE();
} else if (logicalType
instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
paimonDataType = DataTypes.TIME();
} else {
paimonDataType = DataTypes.INT();
}
break;
case INT64:
if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
} else if (logicalType
instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
int precision =
timestampType
.getUnit()
.equals(LogicalTypeAnnotation.TimeUnit.MILLIS)
? 3
: 6;
paimonDataType =
timestampType.isAdjustedToUTC()
? new LocalZonedTimestampType(precision)
: new TimestampType(precision);
} else {
paimonDataType = DataTypes.BIGINT();
}
break;
case INT96:
paimonDataType = new TimestampType(9);
break;
case FIXED_LEN_BYTE_ARRAY:
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
break;
default:
throw new UnsupportedOperationException("Unsupported type: " + parquetType);
}
if (parquetType.getRepetition().equals(Type.Repetition.REQUIRED)) {
paimonDataType = paimonDataType.notNull();
}
return new DataField(
parquetType.getId().intValue(), parquetType.getName(), paimonDataType);
} else {
GroupType groupType = parquetType.asGroupType();
if (logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
paimonDataType =
new ArrayType(
convertToPaimonField(parquetListElementType(groupType)).type());
} else if (logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
Pair<Type, Type> keyValueType = parquetMapKeyValueType(groupType);
paimonDataType =
new MapType(
// Since parquet does not support nullable key, when converting
// back to Paimon, set as nullable by default.
convertToPaimonField(keyValueType.getLeft()).type().nullable(),
convertToPaimonField(keyValueType.getRight()).type());
} else {
paimonDataType =
new RowType(
groupType.getFields().stream()
.map(ParquetSchemaConverter::convertToPaimonField)
.collect(Collectors.toList()));
}
}

if (parquetType.getRepetition().equals(Type.Repetition.REQUIRED)) {
paimonDataType = paimonDataType.notNull();
}

return new DataField(parquetType.getId().intValue(), parquetType.getName(), paimonDataType);
}

public static Type parquetListElementType(GroupType listType) {
return listType.getType(LIST_REPEATED_NAME).asGroupType().getType(LIST_ELEMENT_NAME);
}

public static Pair<Type, Type> parquetMapKeyValueType(GroupType mapType) {
GroupType keyValue = mapType.getType(MAP_REPEATED_NAME).asGroupType();
return Pair.of(keyValue.getType(MAP_KEY_NAME), keyValue.getType(MAP_VALUE_NAME));
}
}
Loading

0 comments on commit 97441c3

Please sign in to comment.