Skip to content

Commit

Permalink
[feature](paimon)support native reader (#29339)
Browse files Browse the repository at this point in the history
Support native reader fro paimon.

Upgrade paimon 0.5 to 0.6 : apache/doris-shade#32
  • Loading branch information
wuwenchi authored Jan 4, 2024
1 parent d8a08da commit bfe6556
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 11 deletions.
17 changes: 14 additions & 3 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -736,11 +736,22 @@ Status VFileScanner::_get_next_reader() {
// JNI reader can only push down column value range
bool push_down_predicates =
!_is_load && _params->format_type != TFileFormatType::FORMAT_JNI;
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
if (range.table_format_params.hudi_params.delta_logs.empty()) {
if (format_type == TFileFormatType::FORMAT_JNI && range.__isset.table_format_params) {
if (range.table_format_params.table_format_type == "hudi" &&
range.table_format_params.hudi_params.delta_logs.empty()) {
// fall back to native reader if there is no log file
format_type = TFileFormatType::FORMAT_PARQUET;
} else if (range.table_format_params.table_format_type == "paimon" &&
!range.table_format_params.paimon_params.__isset.paimon_split) {
// use native reader
auto format = range.table_format_params.paimon_params.file_format;
if (format == "orc") {
format_type = TFileFormatType::FORMAT_ORC;
} else if (format == "parquet") {
format_type = TFileFormatType::FORMAT_PARQUET;
} else {
return Status::InternalError("Not supported paimon file format: {}", format);
}
}
}
bool need_to_get_parsed_schema = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

public class PaimonProperties {
public static final String WAREHOUSE = "warehouse";
public static final String FILE_FORMAT = "file.format";
public static final String PAIMON_PREFIX = "paimon";
public static final String PAIMON_CATALOG_TYPE = "metastore";
public static final String HIVE_METASTORE_URIS = "uri";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
Expand All @@ -41,15 +42,20 @@
import org.apache.doris.thrift.TTableFormatFileDesc;

import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.InstantiationUtil;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -102,7 +108,11 @@ public void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
fileDesc.setPaimonSplit(encodeObjectToString(paimonSplit.getSplit()));
org.apache.paimon.table.source.Split split = paimonSplit.getSplit();
if (split != null) {
fileDesc.setPaimonSplit(encodeObjectToString(split));
}
fileDesc.setFileFormat(source.getFileFormat());
fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot -> slot.getColumn().getName())
.collect(Collectors.joining(",")));
Expand All @@ -127,13 +137,52 @@ public List<Split> getSplits() throws UserException {
List<org.apache.paimon.table.source.Split> paimonSplits = readBuilder.withFilter(predicates)
.withProjection(projected)
.newScan().plan().splits();
boolean supportNative = supportNativeReader();
for (org.apache.paimon.table.source.Split split : paimonSplits) {
PaimonSplit paimonSplit = new PaimonSplit(split);
splits.add(paimonSplit);
if (supportNative && split instanceof DataSplit) {
DataSplit dataSplit = (DataSplit) split;
Optional<List<RawFile>> optRowFiles = dataSplit.convertToRawFiles();
if (optRowFiles.isPresent()) {
List<RawFile> rawFiles = optRowFiles.get();
for (RawFile file : rawFiles) {
LocationPath locationPath = new LocationPath(file.path(), source.getCatalog().getProperties());
Path finalDataFilePath = locationPath.toScanRangeLocation();
try {
splits.addAll(
splitFile(
finalDataFilePath,
0,
null,
file.length(),
-1,
true,
null,
PaimonSplit.PaimonSplitCreator.DEFAULT));
} catch (IOException e) {
throw new UserException("Paimon error to split file: " + e.getMessage(), e);
}
}
} else {
splits.add(new PaimonSplit(split));
}
} else {
splits.add(new PaimonSplit(split));
}
}
return splits;
}

private boolean supportNativeReader() {
String fileFormat = source.getFileFormat().toLowerCase();
switch (fileFormat) {
case "orc":
case "parquet":
return true;
default:
return false;
}
}

//When calling 'setPaimonParams' and 'getSplits', the column trimming has not been performed yet,
// Therefore, paimon_column_names is temporarily reset here
@Override
Expand All @@ -157,8 +206,8 @@ public TFileType getLocationType() throws DdlException, MetaNotFoundException {

@Override
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
//todo: no use
return TFileType.FILE_S3;
return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for paimon table "));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.external.PaimonExternalTable;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;

Expand Down Expand Up @@ -61,4 +62,8 @@ public TFileAttributes getFileAttributes() throws UserException {
public ExternalCatalog getCatalog() {
return paimonExtTable.getCatalog();
}

public String getFileFormat() {
return originTable.options().getOrDefault(PaimonProperties.FILE_FORMAT, "orc");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,30 @@
package org.apache.doris.planner.external.paimon;

import org.apache.doris.planner.external.FileSplit;
import org.apache.doris.planner.external.SplitCreator;
import org.apache.doris.planner.external.TableFormatType;

import org.apache.hadoop.fs.Path;
import org.apache.paimon.table.source.Split;

import java.util.List;

public class PaimonSplit extends FileSplit {
private Split split;
private TableFormatType tableFormatType;

public PaimonSplit(Split split) {
super(new Path("dummyPath"), 0, 0, 0, null, null);
super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
}

public PaimonSplit(Path file, long start, long length, long fileLength, String[] hosts,
List<String> partitionList) {
super(file, start, length, fileLength, hosts, partitionList);
this.tableFormatType = TableFormatType.PAIMON;
}

public Split getSplit() {
return split;
}
Expand All @@ -49,4 +58,19 @@ public void setTableFormatType(TableFormatType tableFormatType) {
this.tableFormatType = tableFormatType;
}

public static class PaimonSplitCreator implements SplitCreator {

static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();

@Override
public org.apache.doris.spi.Split create(Path path,
long start,
long length,
long fileLength,
long modificationTime,
String[] hosts,
List<String> partitionValues) {
return new PaimonSplit(path, start, length, fileLength, hosts, partitionValues);
}
}
}
4 changes: 2 additions & 2 deletions fe/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ under the License.
<doris.home>${fe.dir}/../</doris.home>
<revision>1.2-SNAPSHOT</revision>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<doris.hive.catalog.shade.version>1.0.2</doris.hive.catalog.shade.version>
<doris.hive.catalog.shade.version>1.0.3</doris.hive.catalog.shade.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<!--plugin parameters-->
Expand Down Expand Up @@ -343,7 +343,7 @@ under the License.
<!--todo waiting release-->
<quartz.version>2.3.2</quartz.version>
<!-- paimon -->
<paimon.version>0.5.0-incubating</paimon.version>
<paimon.version>0.6.0-incubating</paimon.version>
<disruptor.version>3.4.4</disruptor.version>
<trino.parser.version>395</trino.parser.version>
<!-- arrow flight sql -->
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ struct TPaimonFileDesc {
8: optional i64 db_id
9: optional i64 tbl_id
10: optional i64 last_update_time
11: optional string file_format
}

struct TMaxComputeFileDesc {
Expand Down

0 comments on commit bfe6556

Please sign in to comment.