Skip to content

Commit

Permalink
[spark] Fix rename table with catalog name (#5027)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Feb 7, 2025
1 parent 50d2316 commit f448fd1
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
import static org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName;
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;

/** Spark {@link TableCatalog} for paimon. */
Expand Down Expand Up @@ -427,7 +428,10 @@ private void validateAlterProperty(String alterKey) {
public void renameTable(Identifier oldIdent, Identifier newIdent)
throws NoSuchTableException, TableAlreadyExistsException {
try {
catalog.renameTable(toIdentifier(oldIdent), toIdentifier(newIdent), false);
catalog.renameTable(
toIdentifier(oldIdent),
toIdentifier(removeCatalogName(newIdent, catalogName)),
false);
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(oldIdent);
} catch (Catalog.TableAlreadyExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,18 @@ public static org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident
checkNamespace(ident.namespace());
return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name());
}

public static Identifier removeCatalogName(Identifier ident, String catalogName) {
String[] namespace = ident.namespace();
if (namespace.length > 1) {
checkArgument(
namespace[0].equals(catalogName),
"Only supports operations within the same catalog, target catalog name: %s, current catalog name: %s",
namespace[0],
catalogName);
return Identifier.of(Arrays.copyOfRange(namespace, 1, namespace.length), ident.name());
} else {
return ident;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -572,4 +572,18 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}
}

test("Paimon DDL: rename table with catalog name") {
sql("USE default")
withTable("t1", "t2") {
sql("CREATE TABLE t1 (id INT) USING paimon")
sql("INSERT INTO t1 VALUES 1")
sql("ALTER TABLE paimon.default.t1 RENAME TO paimon.default.t2")
checkAnswer(sql("SELECT * FROM t2"), Row(1))

assert(intercept[Exception] {
sql("ALTER TABLE paimon.default.t2 RENAME TO spark_catalog.default.t2")
}.getMessage.contains("Only supports operations within the same catalog"))
}
}
}

0 comments on commit f448fd1

Please sign in to comment.