Skip to content

Commit

Permalink
[core] Support add partition for paimon (apache#4323)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuzifu666 authored Oct 16, 2024
1 parent d3d6616 commit cc3ed7f
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
Expand All @@ -40,12 +41,16 @@
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -67,6 +72,9 @@ public abstract class AbstractCatalog implements Catalog {
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
public MetastoreClient metastoreClient;

private static final Logger LOG = LoggerFactory.getLogger(AbstractCatalog.class);

@Nullable protected final LineageMetaFactory lineageMetaFactory;

Expand Down Expand Up @@ -154,6 +162,26 @@ public Map<String, String> loadDatabaseProperties(String name)
protected abstract Map<String, String> loadDatabasePropertiesImpl(String name)
throws DatabaseNotExistException;

public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
TableSchema tableSchema = getDataTableSchema(identifier);
if (!tableSchema.partitionKeys().isEmpty()
&& new CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
try {
// Do not close client, it is for HiveCatalog
if (metastoreClient == null) {
throw new UnsupportedOperationException(
"Only Support HiveCatalog in create partition!");
}
metastoreClient.addPartition(new LinkedHashMap<>(partitionSpec));
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
throw new RuntimeException("the table is not partitioned table in metastore!");
}
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,16 @@ public Path getTableLocation(Identifier identifier) {
return wrapped.getTableLocation(identifier);
}

public void createPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException {
if (wrapped instanceof AbstractCatalog) {
((AbstractCatalog) wrapped).createPartition(identifier, partitions);
} else {
throw new UnsupportedOperationException(
"Only Support HiveCatalog in create partition!");
}
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitions)
throws TableNotExistException, PartitionNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.ProcedureNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
Expand All @@ -100,6 +101,7 @@

import javax.annotation.Nullable;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -1145,8 +1147,21 @@ public final void createPartition(
CatalogPartitionSpec partitionSpec,
CatalogPartition partition,
boolean ignoreIfExists)
throws CatalogException {
throw new UnsupportedOperationException();
throws CatalogException, PartitionAlreadyExistsException {
if (partitionExists(tablePath, partitionSpec)) {
if (!ignoreIfExists) {
throw new PartitionAlreadyExistsException(getName(), tablePath, partitionSpec);
}
}

try {
Identifier identifier = toIdentifier(tablePath);
Method func =
catalog.getClass().getMethod("createPartition", Identifier.class, Map.class);
func.invoke(catalog, identifier, partitionSpec.getPartitionSpec());
} catch (Exception e) {
throw new CatalogException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,23 @@ private Map<String, String> convertToProperties(Database database) {
return properties;
}

@Override
public void createPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
try {
TableSchema tableSchema = getDataTableSchema(identifier);
metastoreClient =
new HiveMetastoreClient(
new Identifier(identifier.getDatabaseName(), identifier.getTableName()),
tableSchema,
clients);
} catch (Exception e) {
throw new RuntimeException(e);
}

super.createPartition(identifier, partitionSpec);
}

@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,26 @@ public void testDropPartitionsToMetastore() throws Exception {
"ptb=2a/pta=2", "ptb=2b/pta=2", "ptb=3a/pta=3", "ptb=3b/pta=3");
}

@Test
public void testCreatePartitionsToMetastore() throws Exception {
prepareTestAddPartitionsToMetastore();

// add partition
tEnv.executeSql(
"ALTER TABLE t ADD PARTITION (ptb = '1c', pta = 1) PARTITION (ptb = '1d', pta = 6)")
.await();
assertThat(hiveShell.executeQuery("show partitions t"))
.containsExactlyInAnyOrder(
"ptb=1a/pta=1",
"ptb=1b/pta=1",
"ptb=1c/pta=1",
"ptb=1d/pta=6",
"ptb=2a/pta=2",
"ptb=2b/pta=2",
"ptb=3a/pta=3",
"ptb=3b/pta=3");
}

@Test
public void testAddPartitionsForTag() throws Exception {
tEnv.executeSql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.spark

import org.apache.paimon.catalog.Identifier
import org.apache.paimon.metastore.MetastoreClient
import org.apache.paimon.operation.FileStoreCommit
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.BatchWriteBuilder
Expand All @@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.types.StructType

import java.util.{Map => JMap, Objects, UUID}
import java.util.{LinkedHashMap, Map => JMap, Objects, UUID}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -114,6 +116,33 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
override def createPartitions(
internalRows: Array[InternalRow],
maps: Array[JMap[String, String]]): Unit = {
throw new UnsupportedOperationException("Create partition is not supported")
table match {
case fileStoreTable: FileStoreTable =>
val rowConverter = CatalystTypeConverters
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
val rowDataPartitionComputer = new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
partitionRowType,
table.partitionKeys().asScala.toArray)
val partitions = internalRows.map {
r =>
rowDataPartitionComputer
.generatePartValues(new SparkRow(partitionRowType, rowConverter(r).asInstanceOf[Row]))
.asInstanceOf[JMap[String, String]]
}
val metastoreClient: MetastoreClient =
fileStoreTable.catalogEnvironment().metastoreClientFactory().create
partitions.foreach {
partition =>
metastoreClient.addPartition(partition.asInstanceOf[LinkedHashMap[String, String]])
}
case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports create partitions.")
}
}

def getIdentifierFromTableName(tableName: String): Identifier = {
val name: Array[String] = tableName.split("\\.")
new Identifier(name.apply(0), name.apply(1))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ abstract class DDLWithHiveCatalogTestBase extends PaimonHiveTestBase {
}
}

test("Paimon DDL with hive catalog: create partition for paimon table sparkCatalogName") {
Seq(paimonHiveCatalogName).foreach {
catalogName =>
spark.sql(s"USE $catalogName")
withTempDir {
dBLocation =>
withDatabase("paimon_db") {
val comment = "this is a test comment"
spark.sql(
s"CREATE DATABASE paimon_db LOCATION '${dBLocation.getCanonicalPath}' COMMENT '$comment'")
Assertions.assertEquals(getDatabaseLocation("paimon_db"), dBLocation.getCanonicalPath)
Assertions.assertEquals(getDatabaseComment("paimon_db"), comment)

withTable("paimon_db.paimon_tbl") {
spark.sql(s"""
|CREATE TABLE paimon_db.paimon_tbl (id STRING, name STRING, pt STRING)
|USING PAIMON
|PARTITIONED BY (name, pt)
|TBLPROPERTIES('metastore.partitioned-table' = 'true')
|""".stripMargin)
Assertions.assertEquals(
getTableLocation("paimon_db.paimon_tbl"),
s"${dBLocation.getCanonicalPath}/paimon_tbl")
spark.sql("insert into paimon_db.paimon_tbl select '1', 'n', 'cc'")

spark.sql("alter table paimon_db.paimon_tbl add partition(name='cc', `pt`='aa') ")
}
}
}
}
}

test("Paimon DDL with hive catalog: create database with props") {
Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
Expand Down

0 comments on commit cc3ed7f

Please sign in to comment.