Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce conversion from parquet type to paimon type #4888

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public FormatWriterFactory createWriterFactory(RowType type) {

@Override
public void validateDataFields(RowType rowType) {
ParquetSchemaConverter.convertToParquetMessageType("paimon_schema", rowType);
ParquetSchemaConverter.convertToParquetMessageType(rowType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Pool;

import org.apache.parquet.ParquetReadOptions;
Expand Down Expand Up @@ -70,11 +71,10 @@
import java.util.List;
import java.util.Set;

import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_ELEMENT_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.LIST_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_KEY_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_REPEATED_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_VALUE_NAME;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.PAIMON_SCHEMA;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.buildFieldsList;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createColumnReader;
import static org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createWritableColumnVector;
Expand All @@ -91,20 +91,15 @@ public class ParquetReaderFactory implements FormatReaderFactory {
private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";

private final Options conf;

private final RowType projectedType;
private final String[] projectedColumnNames;
private final DataField[] projectedFields;
private final DataField[] readFields;
private final int batchSize;
private final FilterCompat.Filter filter;
private final Set<Integer> unknownFieldsIndices = new HashSet<>();

public ParquetReaderFactory(
Options conf, RowType projectedType, int batchSize, FilterCompat.Filter filter) {
Options conf, RowType readType, int batchSize, FilterCompat.Filter filter) {
this.conf = conf;
this.projectedType = projectedType;
this.projectedColumnNames = projectedType.getFieldNames().toArray(new String[0]);
this.projectedFields = projectedType.getFields().toArray(new DataField[0]);
this.readFields = readType.getFields().toArray(new DataField[0]);
this.batchSize = batchSize;
this.filter = filter;
}
Expand All @@ -131,8 +126,7 @@ public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context co
createPoolOfBatches(context.filePath(), requestedSchema);

MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema);
List<ParquetField> fields =
buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO);
List<ParquetField> fields = buildFieldsList(readFields, columnIO);

return new ParquetReader(
reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
Expand Down Expand Up @@ -160,24 +154,23 @@ private void setReadOptions(ParquetReadOptions.Builder builder) {

/** Clips `parquetSchema` according to `fieldNames`. */
private MessageType clipParquetSchema(GroupType parquetSchema) {
Type[] types = new Type[projectedColumnNames.length];
for (int i = 0; i < projectedColumnNames.length; ++i) {
String fieldName = projectedColumnNames[i];
Type[] types = new Type[readFields.length];
for (int i = 0; i < readFields.length; ++i) {
String fieldName = readFields[i].name();
if (!parquetSchema.containsField(fieldName)) {
LOG.warn(
"{} does not exist in {}, will fill the field with null.",
fieldName,
parquetSchema);
types[i] =
ParquetSchemaConverter.convertToParquetType(fieldName, projectedFields[i]);
types[i] = ParquetSchemaConverter.convertToParquetType(readFields[i]);
unknownFieldsIndices.add(i);
} else {
Type parquetType = parquetSchema.getType(fieldName);
types[i] = clipParquetType(projectedFields[i].type(), parquetType);
types[i] = clipParquetType(readFields[i].type(), parquetType);
}
}

return Types.buildMessage().addFields(types).named("paimon-parquet");
return Types.buildMessage().addFields(types).named(PAIMON_SCHEMA);
}

/** Clips `parquetType` by `readType`. */
Expand All @@ -201,33 +194,29 @@ private Type clipParquetType(DataType readType, Type parquetType) {
case MAP:
MapType mapType = (MapType) readType;
GroupType mapGroup = (GroupType) parquetType;
GroupType keyValue = mapGroup.getType(MAP_REPEATED_NAME).asGroupType();
Pair<Type, Type> keyValueType = parquetMapKeyValueType(mapGroup);
return ConversionPatterns.mapType(
mapGroup.getRepetition(),
mapGroup.getName(),
MAP_REPEATED_NAME,
clipParquetType(mapType.getKeyType(), keyValue.getType(MAP_KEY_NAME)),
keyValue.containsField(MAP_VALUE_NAME)
? clipParquetType(
mapType.getValueType(), keyValue.getType(MAP_VALUE_NAME))
: null);
clipParquetType(mapType.getKeyType(), keyValueType.getLeft()),
clipParquetType(mapType.getValueType(), keyValueType.getRight()));
case ARRAY:
ArrayType arrayType = (ArrayType) readType;
GroupType arrayGroup = (GroupType) parquetType;
GroupType list = arrayGroup.getType(LIST_NAME).asGroupType();
return ConversionPatterns.listOfElements(
arrayGroup.getRepetition(),
arrayGroup.getName(),
clipParquetType(
arrayType.getElementType(), list.getType(LIST_ELEMENT_NAME)));
arrayType.getElementType(), parquetListElementType(arrayGroup)));
default:
return parquetType;
}
}

private void checkSchema(MessageType fileSchema, MessageType requestedSchema)
throws IOException, UnsupportedOperationException {
if (projectedColumnNames.length != requestedSchema.getFieldCount()) {
if (readFields.length != requestedSchema.getFieldCount()) {
throw new RuntimeException(
"The quality of field type is incompatible with the request schema!");
}
Expand Down Expand Up @@ -275,13 +264,13 @@ private ParquetReaderBatch createReaderBatch(
}

private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
WritableColumnVector[] columns = new WritableColumnVector[projectedFields.length];
WritableColumnVector[] columns = new WritableColumnVector[readFields.length];
List<Type> types = requestedSchema.getFields();
for (int i = 0; i < projectedFields.length; i++) {
for (int i = 0; i < readFields.length; i++) {
columns[i] =
createWritableColumnVector(
batchSize,
projectedFields[i].type(),
readFields[i].type(),
types.get(i),
requestedSchema.getColumns(),
0);
Expand All @@ -297,7 +286,7 @@ private VectorizedColumnBatch createVectorizedColumnBatch(
WritableColumnVector[] writableVectors) {
ColumnVector[] vectors = new ColumnVector[writableVectors.length];
for (int i = 0; i < writableVectors.length; i++) {
switch (projectedFields[i].type().getTypeRoot()) {
switch (readFields[i].type().getTypeRoot()) {
case DECIMAL:
vectors[i] =
new ParquetDecimalVector(
Expand Down Expand Up @@ -436,7 +425,7 @@ private void readNextRowGroup() throws IOException {
if (!unknownFieldsIndices.contains(i)) {
columnReaders[i] =
createColumnReader(
projectedFields[i].type(),
readFields[i].type(),
types.get(i),
requestedSchema.getColumns(),
rowGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Pair;

import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
Expand All @@ -39,33 +41,40 @@
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

import java.util.List;
import java.util.stream.Collectors;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

/** Schema converter converts Parquet schema to and from Paimon internal types. */
public class ParquetSchemaConverter {

static final String PAIMON_SCHEMA = "paimon_schema";

static final String MAP_REPEATED_NAME = "key_value";
static final String MAP_KEY_NAME = "key";
static final String MAP_VALUE_NAME = "value";
static final String LIST_NAME = "list";
static final String LIST_REPEATED_NAME = "list";
static final String LIST_ELEMENT_NAME = "element";

public static MessageType convertToParquetMessageType(String name, RowType rowType) {
return new MessageType(name, convertToParquetTypes(rowType));
}

public static Type convertToParquetType(String name, DataField field) {
return convertToParquetType(name, field.type(), field.id(), 0);
/** Convert paimon {@link RowType} to parquet {@link MessageType}. */
public static MessageType convertToParquetMessageType(RowType rowType) {
return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType));
}

private static Type[] convertToParquetTypes(RowType rowType) {
return rowType.getFields().stream()
.map(f -> convertToParquetType(f.name(), f.type(), f.id(), 0))
.map(ParquetSchemaConverter::convertToParquetType)
.toArray(Type[]::new);
}

/** Convert paimon {@link DataField} to parquet {@link Type}. */
public static Type convertToParquetType(DataField field) {
return convertToParquetType(field.name(), field.type(), field.id(), 0);
}

private static Type convertToParquetType(String name, DataType type, int fieldId, int depth) {
Type.Repetition repetition =
type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED;
Expand Down Expand Up @@ -260,4 +269,144 @@ public static boolean is32BitDecimal(int precision) {
public static boolean is64BitDecimal(int precision) {
return precision <= 18 && precision > 9;
}

/** Convert parquet {@link MessageType} to paimon {@link RowType}. */
public static RowType convertToPaimonRowType(MessageType messageType) {
List<DataField> dataFields =
messageType.asGroupType().getFields().stream()
.map(ParquetSchemaConverter::convertToPaimonField)
.collect(Collectors.toList());
return new RowType(dataFields);
}

/** Convert parquet {@link Type} to paimon {@link DataField} to. */
public static DataField convertToPaimonField(Type parquetType) {
LogicalTypeAnnotation logicalType = parquetType.getLogicalTypeAnnotation();
DataType paimonDataType;

if (parquetType.isPrimitive()) {
switch (parquetType.asPrimitiveType().getPrimitiveTypeName()) {
case BINARY:
if (logicalType instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
paimonDataType = DataTypes.STRING();
} else {
paimonDataType = DataTypes.BYTES();
}
break;
case BOOLEAN:
paimonDataType = DataTypes.BOOLEAN();
break;
case FLOAT:
paimonDataType = DataTypes.FLOAT();
break;
case DOUBLE:
paimonDataType = DataTypes.DOUBLE();
break;
case INT32:
if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
} else if (logicalType
instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType =
(LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType;
int bitWidth = intType.getBitWidth();
if (bitWidth == 8) {
paimonDataType = DataTypes.TINYINT();
} else if (bitWidth == 16) {
paimonDataType = DataTypes.SMALLINT();
} else {
paimonDataType = DataTypes.INT();
}
} else if (logicalType
instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
paimonDataType = DataTypes.DATE();
} else if (logicalType
instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
paimonDataType = DataTypes.TIME();
} else {
paimonDataType = DataTypes.INT();
}
break;
case INT64:
if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
} else if (logicalType
instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType;
int precision =
timestampType
.getUnit()
.equals(LogicalTypeAnnotation.TimeUnit.MILLIS)
? 3
: 6;
paimonDataType =
timestampType.isAdjustedToUTC()
? new LocalZonedTimestampType(precision)
: new TimestampType(precision);
} else {
paimonDataType = DataTypes.BIGINT();
}
break;
case INT96:
paimonDataType = new TimestampType(9);
break;
case FIXED_LEN_BYTE_ARRAY:
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType;
paimonDataType =
new DecimalType(decimalType.getPrecision(), decimalType.getScale());
break;
default:
throw new UnsupportedOperationException("Unsupported type: " + parquetType);
}
if (parquetType.getRepetition().equals(Type.Repetition.REQUIRED)) {
paimonDataType = paimonDataType.notNull();
}
return new DataField(
parquetType.getId().intValue(), parquetType.getName(), paimonDataType);
} else {
GroupType groupType = parquetType.asGroupType();
if (logicalType instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
paimonDataType =
new ArrayType(
convertToPaimonField(parquetListElementType(groupType)).type());
} else if (logicalType instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
Pair<Type, Type> keyValueType = parquetMapKeyValueType(groupType);
paimonDataType =
new MapType(
// Since parquet does not support nullable key, when converting
// back to Paimon, set as nullable by default.
convertToPaimonField(keyValueType.getLeft()).type().nullable(),
convertToPaimonField(keyValueType.getRight()).type());
} else {
paimonDataType =
new RowType(
groupType.getFields().stream()
.map(ParquetSchemaConverter::convertToPaimonField)
.collect(Collectors.toList()));
}
}

if (parquetType.getRepetition().equals(Type.Repetition.REQUIRED)) {
paimonDataType = paimonDataType.notNull();
}

return new DataField(parquetType.getId().intValue(), parquetType.getName(), paimonDataType);
}

public static Type parquetListElementType(GroupType listType) {
return listType.getType(LIST_REPEATED_NAME).asGroupType().getType(LIST_ELEMENT_NAME);
}

public static Pair<Type, Type> parquetMapKeyValueType(GroupType mapType) {
GroupType keyValue = mapType.getType(MAP_REPEATED_NAME).asGroupType();
return Pair.of(keyValue.getType(MAP_KEY_NAME), keyValue.getType(MAP_VALUE_NAME));
}
}
Loading
Loading