diff --git a/pom.xml b/pom.xml index d2e6bdc917..937357f6ff 100644 --- a/pom.xml +++ b/pom.xml @@ -91,8 +91,9 @@ 1.3.0 3.2.9 _${scala.binary.version} - - 1.14.4 + 3.2.0-1.18 + 3.0.1-1.17 + 1.18.1 1.8.1 1.0.0 1.14 @@ -138,14 +139,13 @@ 10.0.2 3.3.0 org.apache.streampark.shaded - flink-table-uber_${scala.binary.version} 5.1 1.18.24 5.9.1 3.4.6 1.17.14 3.23.1 - 1.19.3 + 1.19.8 5.3.0 1.1.10.1 3.5.3.1 @@ -318,27 +318,6 @@ ${mybatis-plus.version} - - com.github.oshi - oshi-core - ${oshi-core.version} - - - org.slf4j - slf4j-simple - - - org.junit.jupiter - junit-jupiter-api - - - org.hamcrest - hamcrest - - - - - com.fasterxml.jackson.core jackson-databind @@ -351,12 +330,6 @@ ${jackson.version} - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - ${jackson.version} - - com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} @@ -369,52 +342,6 @@ - - - org.apache.curator - curator-framework - ${curator.version} - - - org.slf4j - slf4j-log4j12 - - - - - org.apache.curator - curator-client - ${curator.version} - - - log4j-1.2-api - org.apache.logging.log4j - - - - - org.apache.curator - curator-recipes - ${curator.version} - - - org.apache.zookeeper - zookeeper - - - - - org.apache.curator - curator-test - ${curator.version} - - - - org.xerial.snappy - snappy-java - ${snappy.version} - - org.apache.commons diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java index fb2caab555..d3baa95d4f 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/CatalogType.java @@ -19,6 +19,9 @@ /** catalog type */ public enum CatalogType { + MYSQL, + PGSQL, + ORACLE, JDBC, HIVE, PAIMON, diff --git a/streampark-console/streampark-console-service/pom.xml b/streampark-console/streampark-console-service/pom.xml index b0938bb6fb..2372f65e68 100644 --- a/streampark-console/streampark-console-service/pom.xml +++ b/streampark-console/streampark-console-service/pom.xml @@ -210,6 +210,10 @@ org.pac4j pac4j-core + + org.apache.shiro + shiro-web + @@ -377,12 +381,32 @@ org.apache.streampark streampark-spark-client-api_${scala.binary.version} ${project.version} + + + org.xerial.snappy + snappy-java + + + com.github.luben + zstd-jni + + + com.google.protobuf + protobuf-java + + org.apache.streampark streampark-flink-kubernetes_${scala.binary.version} ${project.version} + + + org.xerial.snappy + snappy-java + + @@ -458,6 +482,36 @@ ${project.version} + + org.apache.flink + flink-table-api-java + ${flink.version} + + + + org.testcontainers + mysql + ${testcontainer.version} + test + + + org.apache.streampark + streampark-flink-connector-plugin + 2.2.0-SNAPSHOT + test + + + org.xerial.snappy + snappy-java + + + + + org.apache.flink + flink-kubernetes + ${flink.version} + compile + @@ -573,6 +627,12 @@ ${project.version} ${project.build.directory}/lib + + org.apache.streampark + streampark-flink-connector-plugin + ${project.version} + ${project.build.directory}/plugins + diff --git a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml index b221f60bf1..5b42644e86 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml +++ b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml @@ -66,6 +66,11 @@ lib 0755 + + ${project.build.directory}/plugins + plugins + 0755 + ${project.build.directory}/../src/main/assembly/conf conf diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql index a756a60b73..c63ae97713 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/data/mysql-data.sql @@ -181,6 +181,25 @@ insert into `t_menu` values (150502, 150500, 'member add', null, null, 'member:a insert into `t_menu` values (150503, 150500, 'member update', null, null, 'member:update', null, '1', 1, null, now(), now()); insert into `t_menu` values (150504, 150500, 'member delete', null, null, 'member:delete', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150601, 150600, 'catalog view', null, null, 'catalog:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150602, 150600, 'catalog create', null, null, 'catalog:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150603, 150600, 'catalog update', null, null, 'catalog:update', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150604, 150600, 'catalog delete', null, null, 'catalog:delete', null, '1', 1, null, now(), now()); + +insert into `t_menu` values (150605, 150600, 'database view', null, null, 'database:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150606, 150600, 'database create', null, null, 'database:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150607, 150600, 'database delete', null, null, 'database:delete', null, '1', 1, null, now(), now()); + +insert into `t_menu` values (150608, 150600, 'table view', null, null, 'table:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150609, 150600, 'table create', null, null, 'table:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150610, 150600, 'table update', null, null, 'table:update', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150611, 150600, 'table view', null, null, 'table:column:add', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150612, 150600, 'table column list', null, null, 'table:column:list', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150613, 150600, 'table column drop', null, null, 'table:column:drop', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150614, 150600, 'table option add', null, null, 'option:add', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150615, 150600, 'table option remove', null, null, 'option:remove', null, '1', 1, null, now(), now()); + +-- ------- -- ---------------------------- -- Records of t_role -- ---------------------------- @@ -296,11 +315,27 @@ insert into `t_role_menu` values (100102, 100002, 140401); insert into `t_role_menu` values (100103, 100002, 140402); insert into `t_role_menu` values (100104, 100002, 140403); insert into `t_role_menu` values (100105, 100002, 150000); -insert into `t_role_menu` values (100106, 100002, 150500); -insert into `t_role_menu` values (100107, 100002, 150501); -insert into `t_role_menu` values (100108, 100002, 150502); -insert into `t_role_menu` values (100109, 100002, 150503); -insert into `t_role_menu` values (100110, 100002, 150504); +insert into `t_role_menu` values (100107, 100002, 150601); +insert into `t_role_menu` values (100108, 100002, 150602); +insert into `t_role_menu` values (100109, 100002, 150603); +insert into `t_role_menu` values (100110, 100002, 150604); +insert into `t_role_menu` values (100111, 100002, 150601); +insert into `t_role_menu` values (100112, 100002, 150602); +insert into `t_role_menu` values (100113, 100002, 150603); +insert into `t_role_menu` values (100114, 100002, 150604); +insert into `t_role_menu` values (100115, 100002, 150605); +insert into `t_role_menu` values (100116, 100002, 150606); +insert into `t_role_menu` values (100117, 100002, 150607); +insert into `t_role_menu` values (100118, 100002, 150608); +insert into `t_role_menu` values (100119, 100002, 150609); +insert into `t_role_menu` values (100120, 100002, 150610); +insert into `t_role_menu` values (100121, 100002, 150611); +insert into `t_role_menu` values (100122, 100002, 150612); +insert into `t_role_menu` values (100123, 100002, 150613); +insert into `t_role_menu` values (100124, 100002, 150614); +insert into `t_role_menu` values (100125, 100002, 150615); +insert into `t_role_menu` values (100126, 100002, 150600); +insert into `t_role_menu` values (100127, 100001, 150600); -- ---------------------------- -- Records of t_setting diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql index 484b5736a2..813d1a7890 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/data/pgsql-data.sql @@ -153,7 +153,22 @@ insert into "public"."t_menu" values (130504, 130500, 'link delete', null, null, insert into "public"."t_menu" values (130601, 130600, 'add yarn queue', null, null, 'yarnQueue:create', '', '1', '0', null, now(), now()); insert into "public"."t_menu" values (130602, 130600, 'edit yarn queue', null, null, 'yarnQueue:update', '', '1', '0', null, now(), now()); insert into "public"."t_menu" values (130603, 130600, 'delete yarn queue', null, null, 'yarnQueue:delete', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (130701, 130700, 'catalog view', null, null, 'catalog:view', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (130702, 130700, 'catalog create', null, null, 'catalog:create', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (130703, 130700, 'catalog delete', null, null, 'catalog:delete', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (130704, 130700, 'catalog update', null, null, 'catalog:update', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (150605, 150600, 'database view', null, null, 'database:view', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150606, 150600, 'database create', null, null, 'database:create', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150607, 150600, 'database delete', null, null, 'database:delete', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150608, 150600, 'table view', null, null, 'table:view', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150609, 150600, 'table create', null, null, 'table:create', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150610, 150600, 'table update', null, null, 'table:update', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150611, 150600, 'table view', null, null, 'table:column:add', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150612, 150600, 'table column list', null, null, 'table:column:list', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150613, 150600, 'table column drop', null, null, 'table:column:drop', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150614, 150600, 'table option add', null, null, 'option:add', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150615, 150600, 'table option remove', null, null, 'option:remove', null, '1', 1, null, now(), now()); -- ---------------------------- -- Records of t_role -- ---------------------------- @@ -258,6 +273,23 @@ insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130600); insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130601); insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130602); insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130603); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130701); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130702); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130703); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130704); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150605); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150606); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150607); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150608); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150609); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150610); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150611); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150612); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150613); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150614); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150615); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150600); +insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 150600); -- ---------------------------- -- Records of t_setting diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql index e054d46c7e..19feaf77d0 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql @@ -666,6 +666,22 @@ create table `t_spark_log` ( primary key (`id`) using btree ) engine=innodb auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; +-- ---------------------------- +-- table structure for t_flink_catalog +-- ---------------------------- +drop table if exists `t_flink_catalog`; +CREATE TABLE `t_flink_catalog` ( + `id` BIGINT AUTO_INCREMENT PRIMARY KEY, + `team_id` BIGINT NOT NULL, + `user_id` BIGINT DEFAULT NULL, + `catalog_type` VARCHAR(255) NOT NULL, + `catalog_name` VARCHAR(255) NOT NULL, + `configuration` TEXT, + `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uniq_catalog_name (`catalog_name`) +) ENGINE=InnoDB auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; + -- ---------------------------- -- Table structure for jdbc registry -- ---------------------------- diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql index dc1bf538a1..91eb751881 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql @@ -816,6 +816,26 @@ create table "public"."t_external_link" ( ; alter table "public"."t_external_link" add constraint "t_external_link_pkey" primary key ("id"); +-- ---------------------------- +-- table structure for t_flink_catalog +-- ---------------------------- + +create sequence "public"."streampark_t_flink_catalog_id_seq" + increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807; + +CREATE TABLE "public"."t_flink_catalog" ( + "id" int8 not null default nextval('streampark_t_flink_catalog_id_seq'::regclass), + "team_id" BIGINT NOT NULL, + "user_id" BIGINT DEFAULT NULL, + "catalog_type" VARCHAR(255) NOT NULL, + "catalog_name" VARCHAR(255) NOT NULL, + "configuration" TEXT, + "create_time" TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, + "update_time" TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT uniq_catalog_name UNIQUE (catalog_name) +); +alter table "public"."t_flink_catalog" add constraint "t_flink_catalog_pkey" primary key ("id"); + -- ---------------------------- -- table structure for t_yarn_queue diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql index 997146f813..91b1167633 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql @@ -155,4 +155,58 @@ CREATE TABLE `t_jdbc_registry_data_change_event` ( ) ENGINE = InnoDB DEFAULT CHARSET = utf8; +-- ---------------------------- +-- table structure for t_flink_catalog +-- ---------------------------- +drop table if exists `t_flink_catalog`; +CREATE TABLE `t_flink_catalog` ( + `id` BIGINT AUTO_INCREMENT PRIMARY KEY, + `team_id` BIGINT NOT NULL, + `user_id` BIGINT DEFAULT NULL, + `catalog_type` VARCHAR(255) NOT NULL, + `catalog_name` VARCHAR(255) NOT NULL, + `configuration` TEXT, + `create_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `update_time` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY uniq_catalog_name (`catalog_name`) +) ENGINE=InnoDB auto_increment=100000 default charset=utf8mb4 collate=utf8mb4_general_ci; + +insert into `t_menu` values (150601, 150600, 'catalog view', null, null, 'catalog:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150602, 150600, 'catalog create', null, null, 'catalog:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150603, 150600, 'catalog update', null, null, 'catalog:update', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150604, 150600, 'catalog delete', null, null, 'catalog:delete', null, '1', 1, null, now(), now()); + +insert into `t_menu` values (150605, 150600, 'database view', null, null, 'database:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150606, 150600, 'database create', null, null, 'database:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150607, 150600, 'database delete', null, null, 'database:delete', null, '1', 1, null, now(), now()); + +insert into `t_menu` values (150608, 150600, 'table view', null, null, 'table:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150609, 150600, 'table create', null, null, 'table:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150610, 150600, 'table update', null, null, 'table:update', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150611, 150600, 'table view', null, null, 'table:column:add', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150612, 150600, 'table column list', null, null, 'table:column:list', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150613, 150600, 'table column drop', null, null, 'table:column:drop', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150614, 150600, 'table option add', null, null, 'option:add', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150615, 150600, 'table option remove', null, null, 'option:remove', null, '1', 1, null, now(), now()); + +-- ------- + +insert into `t_role_menu` values (100107, 100002, 150601); +insert into `t_role_menu` values (100108, 100002, 150602); +insert into `t_role_menu` values (100109, 100002, 150603); +insert into `t_role_menu` values (100110, 100002, 150604); +insert into `t_role_menu` values (100115, 100002, 150605); +insert into `t_role_menu` values (100116, 100002, 150606); +insert into `t_role_menu` values (100117, 100002, 150607); +insert into `t_role_menu` values (100118, 100002, 150608); +insert into `t_role_menu` values (100119, 100002, 150609); +insert into `t_role_menu` values (100120, 100002, 150610); +insert into `t_role_menu` values (100121, 100002, 150611); +insert into `t_role_menu` values (100122, 100002, 150612); +insert into `t_role_menu` values (100123, 100002, 150613); +insert into `t_role_menu` values (100124, 100002, 150614); +insert into `t_role_menu` values (100125, 100002, 150615); +insert into `t_role_menu` values (100126, 100002, 150600); +insert into `t_role_menu` values (100127, 100001, 150600); + set foreign_key_checks = 1; diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql index bb459c99a8..0230c82732 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql @@ -150,3 +150,53 @@ create table t_jdbc_registry_data_change_event create_time timestamp not null default current_timestamp, primary key (id) ); + +-- ---------------------------- +-- table structure for t_flink_catalog +-- ---------------------------- + +create sequence "public"."streampark_t_flink_catalog_id_seq" + increment 1 start 10000 cache 1 minvalue 10000 maxvalue 9223372036854775807; + +CREATE TABLE "public"."t_flink_catalog" ( + "id" int8 not null default nextval('streampark_t_flink_catalog_id_seq'::regclass), + "team_id" BIGINT NOT NULL, + "user_id" BIGINT DEFAULT NULL, + "catalog_type" VARCHAR(255) NOT NULL, + "catalog_name" VARCHAR(255) NOT NULL, + "configuration" TEXT, + "create_time" TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, + "update_time" TIMESTAMP WITHOUT TIME ZONE DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT uniq_catalog_name UNIQUE (catalog_name) +); +alter table "public"."t_flink_catalog" add constraint "t_flink_catalog_pkey" primary key ("id"); + +insert into "public"."t_menu" values (130701, 130700, 'catalog view', null, null, 'catalog:view', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (130702, 130700, 'catalog create', null, null, 'catalog:create', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (130703, 130700, 'catalog delete', null, null, 'catalog:delete', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (130704, 130700, 'catalog update', null, null, 'catalog:update', '', '1', '0', null, now(), now()); +insert into "public"."t_menu" values (150608, 150600, 'table view', null, null, 'table:view', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150609, 150600, 'table create', null, null, 'table:create', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150610, 150600, 'table update', null, null, 'table:update', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150611, 150600, 'table view', null, null, 'table:column:add', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150612, 150600, 'table column list', null, null, 'table:column:list', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150613, 150600, 'table column drop', null, null, 'table:column:drop', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150614, 150600, 'table option add', null, null, 'option:add', null, '1', 1, null, now(), now()); +insert into "public"."t_menu" values (150615, 150600, 'table option remove', null, null, 'option:remove', null, '1', 1, null, now(), now()); + +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130701); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130702); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130703); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 130704); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150606); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150607); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150608); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150609); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150610); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150611); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150612); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150613); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150614); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150615); +insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 150600); +insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 150600); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java index ee136c1b19..9ce940ba27 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/JacksonUtils.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.Map; /** Serialization utils */ public final class JacksonUtils { @@ -67,4 +68,8 @@ public static boolean isValidJson(String jsonStr) { return false; } } + + public static Map toMap(String jsonStr) throws JsonProcessingException { + return (Map) MAPPER.readValue(jsonStr, Map.class); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java index 9a2a4a8de7..57d8b53419 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java @@ -87,4 +87,7 @@ public static File getAppClientDir() { return getAppDir(CLIENT); } + public static File getPluginDir() { + return getAppDir(PLUGINS); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java new file mode 100644 index 0000000000..8b7e82dd25 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/DatabaseParam.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.Data; + +import javax.validation.constraints.NotBlank; + +@Data +public class DatabaseParam { + + @NotBlank(message = "invalid.databaseName") + private String name; + + @NotBlank(message = "invalid.catalogId") + private Long catalogId; + + private String catalogName; + private boolean ignoreIfExits; + private boolean cascade; + private String description; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java index 46a8210d80..cb12ad4c5f 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkCatalogParams.java @@ -21,10 +21,10 @@ import org.apache.streampark.console.base.util.JacksonUtils; import org.apache.streampark.console.core.entity.FlinkCatalog; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.codehaus.jackson.annotate.JsonProperty; import org.springframework.beans.BeanUtils; import javax.validation.constraints.NotBlank; @@ -67,6 +67,9 @@ public static FlinkCatalogParams of(FlinkCatalog flinkCatalog) { BeanUtils.copyProperties(flinkCatalog, flinkCatalogParams, "configuration"); try { switch (flinkCatalog.getCatalogType()) { + case MYSQL: + case PGSQL: + case ORACLE: case JDBC: flinkCatalogParams.setFlinkJDBCCatalog( JacksonUtils.read(flinkCatalog.getConfiguration(), FlinkJDBCCatalog.class)); @@ -116,8 +119,6 @@ public static class FlinkHiveCatalog implements Serializable { @NotBlank private String type; - @NotBlank - private String name; @JsonProperty("hive-conf-dir") private String hiveConfDir; @@ -130,6 +131,46 @@ public static class FlinkHiveCatalog implements Serializable { @JsonProperty("hadoop-conf-dir") private String hadoopConfDir; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getHiveConfDir() { + return hiveConfDir; + } + + public void setHiveConfDir(String hiveConfDir) { + this.hiveConfDir = hiveConfDir; + } + + public String getDefaultDatabase() { + return defaultDatabase; + } + + public void setDefaultDatabase(String defaultDatabase) { + this.defaultDatabase = defaultDatabase; + } + + public String getHiveVersion() { + return hiveVersion; + } + + public void setHiveVersion(String hiveVersion) { + this.hiveVersion = hiveVersion; + } + + public String getHadoopConfDir() { + return hadoopConfDir; + } + + public void setHadoopConfDir(String hadoopConfDir) { + this.hadoopConfDir = hadoopConfDir; + } } @Data diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkDataType.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkDataType.java new file mode 100644 index 0000000000..084fdfe905 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/FlinkDataType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** flink table datatype */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class FlinkDataType { + + private String type; + + private boolean isNullable; + + private Integer precision; + + private Integer scale; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableColumn.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableColumn.java new file mode 100644 index 0000000000..87f9869df6 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableColumn.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.annotation.Nullable; + +/** TableColumn model. */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableColumn { + + private Integer id; + + private String field; + + private FlinkDataType dataType; + + @Nullable + private String comment; + + @Nullable + private boolean isPk; + + @Nullable + private String defaultValue; + + private Integer sort; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableParams.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableParams.java new file mode 100644 index 0000000000..dbc518f0a9 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/TableParams.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.bean; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + +import java.util.List; +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TableParams { + + @NotNull(message = "invalid.catalogId") + private Long catalogId; + + private String catalogName; + + @NotBlank(message = "invalid.databaseName") + private String databaseName; + + @NotBlank(message = "invalid.tableName") + private String name; + + private String description; + + private List tableColumns; + + private List partitionKey; + + private Map tableOptions; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java new file mode 100644 index 0000000000..c20a3006cf --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/DatabaseController.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.controller; + +import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.domain.RestResponse; +import org.apache.streampark.console.core.bean.DatabaseParam; +import org.apache.streampark.console.core.service.DatabaseService; + +import org.apache.shiro.authz.annotation.RequiresPermissions; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.io.IOException; +import java.util.List; + +@Slf4j +@Validated +@RestController +@RequestMapping("flink/database") +public class DatabaseController { + + @Autowired + private DatabaseService databaseService; + + @PostMapping("create") + @RequiresPermissions("database:create") + public RestResponse create(DatabaseParam databaseParam) throws IOException { + boolean saved = databaseService.createDatabase(databaseParam); + return RestResponse.success(saved); + } + + @PostMapping("list") + @RequiresPermissions("database:view") + public RestResponse list(DatabaseParam databaseParam, RestRequest request) { + List databaseParamList = + databaseService.listDatabases(databaseParam.getCatalogId()); + return RestResponse.success(databaseParamList); + } + + @PostMapping("delete") + @RequiresPermissions("database:delete") + public RestResponse remove(DatabaseParam databaseParam) { + boolean deleted = databaseService.dropDatabase(databaseParam); + return RestResponse.success(deleted); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java index 2817c2ecce..c8021935b1 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkCatalogController.java @@ -21,6 +21,7 @@ import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.core.annotation.Permission; import org.apache.streampark.console.core.bean.FlinkCatalogParams; +import org.apache.streampark.console.core.entity.FlinkCatalog; import org.apache.streampark.console.core.service.FlinkCatalogService; import org.apache.streampark.console.core.util.ServiceHelper; @@ -30,6 +31,8 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -62,6 +65,14 @@ public RestResponse list(FlinkCatalogParams catalog, RestRequest request) { return RestResponse.success(catalogList); } + @GetMapping("get/{catalogName}") + @Permission(team = "#teamId") + @RequiresPermissions("catalog:view") + public RestResponse get(@PathVariable String catalogName, Long teamId) { + FlinkCatalog catalog = catalogService.getCatalog(catalogName); + return RestResponse.success(FlinkCatalogParams.of(catalog)); + } + @PostMapping("delete") @Permission(team = "#app.teamId") @RequiresPermissions("catalog:delete") @@ -73,7 +84,7 @@ public RestResponse remove(FlinkCatalogParams catalog, RestRequest request) { @PostMapping("update") @Permission(team = "#app.teamId") @RequiresPermissions("catalog:update") - public RestResponse remove(FlinkCatalogParams catalog) { + public RestResponse update(FlinkCatalogParams catalog) { Long userId = ServiceHelper.getUserId(); boolean updated = catalogService.update(catalog, userId); return RestResponse.success(updated); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/TableController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/TableController.java new file mode 100644 index 0000000000..491cb961f0 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/TableController.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.controller; + +import org.apache.streampark.console.base.domain.RestResponse; +import org.apache.streampark.console.core.bean.TableParams; +import org.apache.streampark.console.core.service.TableService; + +import org.apache.shiro.authz.annotation.RequiresPermissions; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.DeleteMapping; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +@Slf4j +@Validated +@RestController +@RequestMapping("flink/table") +public class TableController { + + @Autowired + private TableService tableService; + + @PostMapping("create") + @RequiresPermissions("table:create") + public RestResponse createTable(TableParams table) { + boolean saved = tableService.createTable(table); + return RestResponse.success(saved); + } + + @PostMapping("column/add") + @RequiresPermissions("table:column:add") + public RestResponse addColumn(TableParams table) { + boolean saved = tableService.addColumn(table); + return RestResponse.success(saved); + } + + @GetMapping("column/list") + @RequiresPermissions("table:column:list") + public RestResponse listColumns( + @RequestParam String catalogName, + @RequestParam String databaseName, + @RequestParam String tableName) { + TableParams tableParams = tableService.listColumns(catalogName, databaseName, tableName); + return RestResponse.success(tableParams); + } + + @DeleteMapping("column/drop/{catalogName}/{databaseName}/{tableName}/{columnName}") + @RequiresPermissions("table:column:drop") + public RestResponse dropColumns( + @PathVariable String catalogName, + @PathVariable String databaseName, + @PathVariable String tableName, + @PathVariable String columnName) { + boolean dropped = tableService.dropColumn(catalogName, databaseName, tableName, columnName); + return RestResponse.success(dropped); + } + + @PostMapping("option/add") + @RequiresPermissions("option:add") + public RestResponse addOption(TableParams table) { + boolean addedOption = tableService.addOption(table); + return RestResponse.success(addedOption); + } + + @PostMapping("option/remove") + @RequiresPermissions("option:remove") + public RestResponse removeOption( + @RequestParam String catalogName, + @RequestParam String databaseName, + @RequestParam String tableName, + @RequestParam String key) { + boolean removedOption = tableService.removeOption(catalogName, databaseName, tableName, key); + return RestResponse.success(removedOption); + } + + @PostMapping("rename") + @RequiresPermissions("table:update") + public RestResponse renameTable( + @RequestParam String catalogName, + @RequestParam String databaseName, + @RequestParam String fromTableName, + @RequestParam String toTableName) { + boolean renamedOption = + tableService.renameTable(catalogName, databaseName, fromTableName, toTableName); + return RestResponse.success(renamedOption); + } + + @PostMapping("list") + @RequiresPermissions("table:view") + public RestResponse listTable(TableParams table) { + List tableParamsList = tableService.listTables(table); + if (Objects.nonNull(table.getCatalogId()) && Objects.nonNull(table.getDatabaseName())) { + return RestResponse.success(tableParamsList); + } else { + TreeMap>> collect = + tableParamsList.stream() + .collect( + Collectors.groupingBy( + TableParams::getCatalogId, + TreeMap::new, + Collectors.groupingBy(TableParams::getDatabaseName))); + return RestResponse.success(collect); + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java new file mode 100644 index 0000000000..fcd507c3dd --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Database.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.entity; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class Database { + + private String name; + + private Integer catalogId; + + private String catalogName; + + private String description; +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java index b8de5a2e55..f9e2b6d412 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCatalog.java @@ -72,8 +72,14 @@ public static FlinkCatalog of(FlinkCatalogParams flinkCatalogParams) { "flinkPaimonCatalog", "customCatalogConfig"); + if (null == flinkCatalogParams.getCatalogType()) { + return flinkCatalog; + } try { switch (flinkCatalogParams.getCatalogType()) { + case MYSQL: + case PGSQL: + case ORACLE: case JDBC: flinkCatalog.setConfiguration( JacksonUtils.write(flinkCatalogParams.getFlinkJDBCCatalog())); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java new file mode 100644 index 0000000000..d8d26eb061 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/DatabaseMapper.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.mapper; + +import org.apache.streampark.console.core.entity.Database; + +import org.apache.ibatis.annotations.Mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +@Mapper +public interface DatabaseMapper extends BaseMapper { +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkCatalogMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkCatalogMapper.java index 4f5d86092f..1706766c19 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkCatalogMapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkCatalogMapper.java @@ -30,5 +30,7 @@ public interface FlinkCatalogMapper extends BaseMapper { boolean existsByCatalogName(@Param("catalogName") String catalogName); + FlinkCatalog selectByCatalogName(@Param("catalogName") String catalogName); + IPage selectPage(Page page, @Param("catalog") FlinkCatalog catalog); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java new file mode 100644 index 0000000000..0a8c749c86 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/DatabaseService.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.bean.DatabaseParam; +import org.apache.streampark.console.core.entity.Database; + +import com.baomidou.mybatisplus.extension.service.IService; + +import java.util.List; + +public interface DatabaseService extends IService { + + /** + * Checks if the specified database exists. + * + * @param databaseParam The database to check + * @return true if the database exists, false otherwise + */ + boolean databaseExists(DatabaseParam databaseParam); + + /** + * Creates a new database given {@link Database}. + * + * @param databaseParam The {@link DatabaseParam} object that contains the detail of the created + * database + * @return true if the operation is successful, false otherwise + */ + boolean createDatabase(DatabaseParam databaseParam); + + /** + * Lists databases given catalog id. + * + * @return The list of databases of given catalog + */ + List listDatabases(Long catalogId); + + /** + * Drops database given database name. + * + * @param databaseParam The dropping database + * @return true if the operation is successful, false otherwise + */ + boolean dropDatabase(DatabaseParam databaseParam); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogBase.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogBase.java new file mode 100644 index 0000000000..c1558a0eb1 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogBase.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.util.CatalogServiceUtils; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@Service +public class FlinkCatalogBase { + + @Value("${table.catalog-store.kind:jdbc}") + private String storeKind; + + @Value("${table.catalog-store.jdbc.url:jdbc://mysql:127.0.0.1:3306/flink-test}") + private String jdbcUrl; + + @Value("${table.catalog-store.jdbc.driver:com.mysql.cj.jdbc.Driver}") + private String jdbcDriver; + + @Value("${table.catalog-store.jdbc.username:flinkuser}") + private String jdbcUserName; + + @Value("${table.catalog-store.jdbc.password:flinkpw}") + private String jdbcPassword; + + @Value("${table.catalog-store.jdbc.max-retry-timeout:600") + private String jdbcMaxRetryTimeout; + + private final Map catalogMap = new ConcurrentHashMap<>(0); + + public List listDatabases(String catalogName, Map options) { + Catalog catalog = getCatalog(catalogName, options); + return catalog.listDatabases(); + } + + public boolean databaseExists(String catalogName, Map options, String database) { + Catalog catalog = getCatalog(catalogName, options); + return catalog.databaseExists(database); + } + + public boolean createDatabase( + String catalogName, + Map options, + String databaseName, + CatalogDatabase catalogDatabase, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.createDatabase(databaseName, catalogDatabase, ignoreIfExists); + return true; + } catch (CatalogException | DatabaseAlreadyExistException e) { + log.error("create database {} failed.", databaseName, e); + throw new CatalogException( + String.format("The database '%s' already exists in the catalog.", databaseName)); + } + } + + public void dropDatabase( + String catalogName, + Map options, + String databaseName, + boolean cascade, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.dropDatabase(databaseName, ignoreIfExists, cascade); + } catch (CatalogException | DatabaseNotEmptyException | DatabaseNotExistException e) { + log.error("Drop database {} failed.", databaseName, e); + throw new CatalogException( + String.format("The database '%s' already exists in the catalog.", databaseName)); + } + } + + public boolean tableExists( + String catalogName, Map options, String databaseName, String tableName) { + Catalog catalog = getCatalog(catalogName, options); + try { + return catalog.tableExists(new ObjectPath(databaseName, tableName)); + } catch (CatalogException e) { + log.error("Table exists {}.{} failed.", databaseName, tableName, e); + throw new CatalogException( + String.format("The table '%s.%s' not exists in the catalog.", databaseName, tableName)); + } + } + + public boolean createTable( + String catalogName, + Map options, + String databaseName, + String tableName, + CatalogTable catalogTable, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.createTable(new ObjectPath(databaseName, tableName), catalogTable, ignoreIfExists); + return true; + } catch (CatalogException e) { + log.error("Table {}.{} create failed.", databaseName, tableName, e); + throw new CatalogException( + String.format("Table '%s.%s' create failed.", databaseName, tableName)); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + log.error( + "Table {}.{} create failed.because table is exits or database not exist", + databaseName, + tableName, + e); + throw new RuntimeException(e); + } + } + + public boolean alterTable( + String catalogName, + Map options, + String databaseName, + String tableName, + List changes, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + CatalogBaseTable originTable = getTable(catalogName, options, databaseName, tableName); + CatalogTable currentTable = (CatalogTable) originTable; + + Schema currentSchema = currentTable.getUnresolvedSchema(); + final Schema schema = applyChangesToSchema(currentSchema, changes); + Map newOptions = new ConcurrentHashMap<>(); + for (TableChange change : changes) { + if (change instanceof TableChange.SetOption) { + newOptions.put( + ((TableChange.SetOption) change).getKey(), + ((TableChange.SetOption) change).getValue()); + } + if (change instanceof TableChange.ResetOption) { + newOptions.remove(((TableChange.ResetOption) change).getKey()); + } + } + + final CatalogTable newTable = + CatalogTable.of( + schema, currentTable.getComment(), currentTable.getPartitionKeys(), newOptions); + catalog.alterTable(new ObjectPath(databaseName, tableName), newTable, ignoreIfExists); + return true; + } catch (TableNotExistException e) { + throw new RuntimeException(e); + } + } + + public boolean dropTable( + String catalogName, + Map options, + String databaseName, + String tableName, + boolean ignoreIfExists) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.dropTable(new ObjectPath(databaseName, tableName), ignoreIfExists); + return true; + } catch (TableNotExistException e) { + throw new RuntimeException(e); + } + } + + public boolean renameTable( + String catalogName, + Map options, + String databaseName, + String fromTableName, + String toTableName) { + Catalog catalog = getCatalog(catalogName, options); + try { + catalog.renameTable(new ObjectPath(databaseName, fromTableName), toTableName, true); + return true; + } catch (TableNotExistException | TableAlreadyExistException e) { + throw new RuntimeException(e); + } + } + + public List listTable( + String catalogName, Map options, String databaseName) { + Catalog catalog = getCatalog(catalogName, options); + try { + return catalog.listTables(databaseName); + } catch (DatabaseNotExistException e) { + throw new RuntimeException(e); + } + } + + public CatalogBaseTable getTable( + String catalogName, Map options, String databaseName, + String tableName) { + Catalog catalog = getCatalog(catalogName, options); + try { + return catalog.getTable(new ObjectPath(databaseName, tableName)); + } catch (TableNotExistException e) { + throw new RuntimeException(e); + } + } + + private Schema applyChangesToSchema(Schema currentSchema, List changes) { + // Clone the current schema to avoid modifying the original + Schema.Builder schemaBuilder = Schema.newBuilder().fromSchema(currentSchema); + + // Iterate over each change and apply it to the schema builder + Set columnsToDrop = new HashSet<>(); + for (TableChange change : changes) { + if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + schemaBuilder.column(addColumn.getColumn().getName(), addColumn.getColumn().getDataType()); + } else if (change instanceof TableChange.ModifyColumn) { + TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; + schemaBuilder.column( + modifyColumn.getNewColumn().getName(), modifyColumn.getNewColumn().getDataType()); + } else if (change instanceof TableChange.DropColumn) { + TableChange.DropColumn removeColumn = (TableChange.DropColumn) change; + columnsToDrop.add(removeColumn.getColumnName()); + } else { + throw new UnsupportedOperationException( + "Unsupported table change type: " + change.getClass().getName()); + } + } + // drop columns + if (!columnsToDrop.isEmpty()) { + for (Schema.UnresolvedColumn column : currentSchema.getColumns()) { + if (column instanceof Schema.UnresolvedPhysicalColumn) { + Schema.UnresolvedPhysicalColumn physicalColumn = (Schema.UnresolvedPhysicalColumn) column; + if (!columnsToDrop.contains(physicalColumn.getName())) { + schemaBuilder.column(physicalColumn.getName(), physicalColumn.getDataType()); + } + } + } + } + // Build the updated schema + return schemaBuilder.build(); + } + + private Catalog getCatalog(String catalogName, Map options) { + if (catalogMap.containsKey(catalogName)) { + return catalogMap.get(catalogName); + } else { + Map configuration = new HashMap<>(); + configuration.put(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND.key(), storeKind); + configuration.put("table.catalog-store.jdbc.url", jdbcUrl); + configuration.put("table.catalog-store.jdbc.driver", jdbcDriver); + configuration.put("table.catalog-store.jdbc.username", jdbcUserName); + configuration.put("table.catalog-store.jdbc.password", jdbcPassword); + configuration.put("table.catalog-store.jdbc.max-retry-timeout", jdbcMaxRetryTimeout); + Catalog catalog = CatalogServiceUtils.getCatalog(catalogName, options, configuration); + catalog.open(); + catalogMap.put(catalogName, catalog); + return catalog; + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java index 160ae952fb..7be26068f2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkCatalogService.java @@ -49,10 +49,13 @@ public interface FlinkCatalogService extends IService { */ IPage page(FlinkCatalogParams catalog, RestRequest request); + FlinkCatalog getCatalog(Long catalogId); + + FlinkCatalog getCatalog(String catalogName); /** * update Catalog * * @param catalog The {@link FlinkCatalogParams} object containing the search criteria. */ - boolean update(FlinkCatalogParams catalog, long userId); + boolean update(FlinkCatalogParams catalog, Long userId); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/TableService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/TableService.java new file mode 100644 index 0000000000..2dc9bbcf95 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/TableService.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service; + +import org.apache.streampark.console.core.bean.TableParams; + +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +public interface TableService { + + /** + * Checks if the specified table exists. + * + * @param tableParams The TableParams object containing information about the table + * @return true if the table exists, false otherwise + */ + boolean tableExists(TableParams tableParams); + + /** + * Creates a table in the database given ${@link TableParams}. + * + * @param tableParams The TableParams object containing information about the table + * @return true if the operation is successful, false otherwise + */ + boolean createTable(TableParams tableParams); + + /** + * Adds a column to the table. + * + * @param tableParams The TableDTO object containing information about the table + * @return true if the operation is successful, false otherwise + */ + boolean addColumn(TableParams tableParams); + + /** + * Drops a column from a table. + * + * @param catalogName The name of the catalog + * @param databaseName The name of the database + * @param tableName The name of the table + * @param columnName The name of the column to be dropped + * @return true if the operation is successful, false otherwise + */ + boolean dropColumn(String catalogName, String databaseName, String tableName, String columnName); + + /** + * Adds options to a table. + * + * @param tableDTO The TableDTO object containing information about the table + * @return true if the operation is successful, false otherwise + */ + boolean addOption(TableParams tableDTO); + + /** + * Removes an option from a table. + * + * @param catalogName The name of the catalog + * @param databaseName The name of the database + * @param tableName The name of the table + * @param key The key of the option to be removed + * @return true if the operation is successful, false otherwise + */ + boolean removeOption(String catalogName, String databaseName, String tableName, String key); + + /** + * Drops a table from the specified database in the given catalog. + * + * @param catalogName The name of the catalog from which the table will be dropped + * @param databaseName The name of the database from which the table will be dropped + * @param tableName The name of the table to be dropped + * @return true if the operation is successful, false otherwise + */ + boolean dropTable(String catalogName, String databaseName, String tableName); + + /** + * Renames a table in the specified database of the given catalog. + * + * @param catalogName The name of the catalog where the table resides + * @param databaseName The name of the database where the table resides + * @param fromTableName The current name of the table to be renamed + * @param toTableName The new name for the table + * @return true if the operation is successful, false otherwise + */ + boolean renameTable( + String catalogName, String databaseName, String fromTableName, String toTableName); + + /** + * Lists tables given {@link TableParams} condition. + * + * @return Response object containing a list of {@link TableParams} representing the tables + */ + List listTables(TableParams tableDTO); + + /** + * Retrieves the column details of a specific table within the specified catalog and database. + * + * @param catalogName The name of the catalog where the table is located + * @param databaseName The name of the database where the table is located + * @param tableName The name of the table whose columns are to be retrieved + * @return A {@link TableParams} object containing the details of the columns of the specified + * table + */ + TableParams listColumns(String catalogName, String databaseName, String tableName); +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java index cd0344fdb7..bd62eb99f2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java @@ -92,6 +92,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException; import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; @@ -101,7 +102,6 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.google.common.collect.Sets; -import io.fabric8.kubernetes.client.KubernetesClientException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -128,7 +128,9 @@ @Slf4j @Service @Transactional(propagation = Propagation.SUPPORTS, readOnly = true, rollbackFor = Exception.class) -public class FlinkApplicationActionServiceImpl extends ServiceImpl +public class FlinkApplicationActionServiceImpl + extends + ServiceImpl implements FlinkApplicationActionService { @@ -187,9 +189,11 @@ public class FlinkApplicationActionServiceImpl extends ServiceImpl> startFutureMap = new ConcurrentHashMap<>(); + private final Map> startFutureMap = + new ConcurrentHashMap<>(); - private final Map> cancelFutureMap = new ConcurrentHashMap<>(); + private final Map> cancelFutureMap = + new ConcurrentHashMap<>(); @Override public void revoke(Long appId) throws ApplicationException { @@ -320,26 +324,28 @@ public void cancel(FlinkApplication appParam) throws Exception { properties.put(RestOptions.PORT.key(), activeAddress.getPort()); } - Tuple3 clusterIdNamespace = getNamespaceClusterId(application); + Tuple3 clusterIdNamespace = + getNamespaceClusterId(application); String namespace = clusterIdNamespace.t1; String clusterId = clusterIdNamespace.t2; - CancelRequest cancelRequest = new CancelRequest( - application.getId(), - flinkEnv.getFlinkVersion(), - FlinkDeployMode.of(application.getDeployMode()), - properties, - clusterId, - application.getJobId(), - appParam.getRestoreOrTriggerSavepoint(), - appParam.getDrain(), - customSavepoint, - appParam.getNativeFormat(), - namespace); + CancelRequest cancelRequest = + new CancelRequest( + application.getId(), + flinkEnv.getFlinkVersion(), + FlinkDeployMode.of(application.getDeployMode()), + properties, + clusterId, + application.getJobId(), + appParam.getRestoreOrTriggerSavepoint(), + appParam.getDrain(), + customSavepoint, + appParam.getNativeFormat(), + namespace); final Date triggerTime = new Date(); - CompletableFuture cancelFuture = CompletableFuture - .supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService); + CompletableFuture cancelFuture = + CompletableFuture.supplyAsync(() -> FlinkClient.cancel(cancelRequest), executorService); cancelFutureMap.put(application.getId(), cancelFuture); @@ -464,41 +470,45 @@ public void start(FlinkApplication appParam, boolean auto) throws Exception { } // Get the args after placeholder replacement - String args = StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs(); + String args = + StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs(); String applicationArgs = variableService.replaceVariable(application.getTeamId(), args); - Tuple3 clusterIdNamespace = getNamespaceClusterId(application); + Tuple3 clusterIdNamespace = + getNamespaceClusterId(application); String k8sNamespace = clusterIdNamespace.t1; String k8sClusterId = clusterIdNamespace.t2; FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3; String dynamicProperties = - StringUtils.isBlank(appParam.getDynamicProperties()) ? application.getDynamicProperties() + StringUtils.isBlank(appParam.getDynamicProperties()) + ? application.getDynamicProperties() : appParam.getDynamicProperties(); - SubmitRequest submitRequest = new SubmitRequest( - flinkEnv.getFlinkVersion(), - FlinkDeployMode.of(application.getDeployMode()), - getProperties(application, dynamicProperties), - flinkEnv.getFlinkConf(), - FlinkJobType.of(application.getJobType()), - application.getId(), - new JobID().toHexString(), - application.getJobName(), - appConf, - application.getApplicationType(), - getSavepointPath(appParam), - FlinkRestoreMode.of(appParam.getRestoreMode()), - applicationArgs, - k8sClusterId, - application.getHadoopUser(), - buildResult, - extraParameter, - k8sNamespace, - exposedType); - - CompletableFuture future = CompletableFuture - .supplyAsync(() -> FlinkClient.submit(submitRequest), executorService); + SubmitRequest submitRequest = + new SubmitRequest( + flinkEnv.getFlinkVersion(), + FlinkDeployMode.of(application.getDeployMode()), + getProperties(application, dynamicProperties), + flinkEnv.getFlinkConf(), + FlinkJobType.of(application.getJobType()), + application.getId(), + new JobID().toHexString(), + application.getJobName(), + appConf, + application.getApplicationType(), + getSavepointPath(appParam), + FlinkRestoreMode.of(appParam.getRestoreMode()), + applicationArgs, + k8sClusterId, + application.getHadoopUser(), + buildResult, + extraParameter, + k8sNamespace, + exposedType); + + CompletableFuture future = + CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService); startFutureMap.put(application.getId(), future); @@ -628,10 +638,11 @@ private void processForException( private boolean checkAppRepeatInYarn(String jobName) { try { YarnClient yarnClient = HadoopUtils.yarnClient(); - Set types = Sets.newHashSet( - ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName()); - EnumSet states = EnumSet.of(YarnApplicationState.RUNNING, - YarnApplicationState.ACCEPTED); + Set types = + Sets.newHashSet( + ApplicationType.STREAMPARK_FLINK.getName(), ApplicationType.APACHE_FLINK.getName()); + EnumSet states = + EnumSet.of(YarnApplicationState.RUNNING, YarnApplicationState.ACCEPTED); List applications = yarnClient.getApplications(types, states); for (ApplicationReport report : applications) { if (report.getName().equals(jobName)) { @@ -650,7 +661,8 @@ private void starting(FlinkApplication application) { updateById(application); } - private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkApplication application) { + private Tuple2 getUserJarAndAppConf( + FlinkEnv flinkEnv, FlinkApplication application) { FlinkDeployMode deployModeEnum = application.getDeployModeEnum(); FlinkApplicationConfig applicationConfig = configService.getEffective(application.getId()); @@ -667,9 +679,10 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl // 1) dist_userJar String sqlDistJar = ServiceHelper.getFlinkSqlClientJar(flinkEnv); // 2) appConfig - appConf = applicationConfig == null - ? null - : String.format("yaml://%s", applicationConfig.getContent()); + appConf = + applicationConfig == null + ? null + : String.format("yaml://%s", applicationConfig.getContent()); // 3) client if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) { String clientPath = Workspace.remote().APP_CLIENT(); @@ -678,7 +691,8 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl break; case PYFLINK: - Resource resource = resourceService.findByResourceName(application.getTeamId(), application.getJar()); + Resource resource = + resourceService.findByResourceName(application.getTeamId(), application.getJar()); ApiAlertException.throwIfNull( resource, "pyflink file can't be null, start application failed."); @@ -695,25 +709,28 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl case CUSTOM_CODE: if (application.isUploadJob()) { - appConf = String.format( - "json://{\"%s\":\"%s\"}", - ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + appConf = + String.format( + "json://{\"%s\":\"%s\"}", + ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); } else { switch (application.getApplicationType()) { case STREAMPARK_FLINK: ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(applicationConfig.getFormat()); if (fileType != null && ConfigFileTypeEnum.UNKNOWN != fileType) { - appConf = String.format( - "%s://%s", fileType.getTypeName(), applicationConfig.getContent()); + appConf = + String.format( + "%s://%s", fileType.getTypeName(), applicationConfig.getContent()); } else { throw new IllegalArgumentException( "application' config type error,must be ( yaml| properties| hocon )"); } break; case APACHE_FLINK: - appConf = String.format( - "json://{\"%s\":\"%s\"}", - ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + appConf = + String.format( + "json://{\"%s\":\"%s\"}", + ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); break; default: throw new IllegalArgumentException( @@ -724,21 +741,23 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) { switch (application.getApplicationType()) { case STREAMPARK_FLINK: - flinkUserJar = String.format( - "%s/%s", - application.getAppLib(), - application.getModule().concat(Constants.JAR_SUFFIX)); + flinkUserJar = + String.format( + "%s/%s", + application.getAppLib(), + application.getModule().concat(Constants.JAR_SUFFIX)); break; case APACHE_FLINK: flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar()); if (!FsOperator.hdfs().exists(flinkUserJar)) { - resource = resourceService.findByResourceName( - application.getTeamId(), application.getJar()); + resource = + resourceService.findByResourceName( + application.getTeamId(), application.getJar()); if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) { - flinkUserJar = String.format( - "%s/%s", - application.getAppHome(), - new File(resource.getFilePath()).getName()); + flinkUserJar = + String.format( + "%s/%s", + application.getAppHome(), new File(resource.getFilePath()).getName()); } } break; @@ -752,7 +771,8 @@ private Tuple2 getUserJarAndAppConf(FlinkEnv flinkEnv, FlinkAppl return Tuple2.of(flinkUserJar, appConf); } - private Map getProperties(FlinkApplication application, String runtimeProperties) { + private Map getProperties( + FlinkApplication application, String runtimeProperties) { Map properties = new HashMap<>(application.getOptionMap()); if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())) { FlinkCluster cluster = flinkClusterService.getById(application.getFlinkClusterId()); @@ -776,8 +796,10 @@ private Map getProperties(FlinkApplication application, String r application.getFlinkClusterId())); properties.put(ConfigKeys.KEY_YARN_APP_ID(), cluster.getClusterId()); } else { - String yarnQueue = (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE()); - String yarnLabelExpr = (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()); + String yarnQueue = + (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE()); + String yarnLabelExpr = + (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()); Optional.ofNullable(yarnQueue) .ifPresent(yq -> properties.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yq)); Optional.ofNullable(yarnLabelExpr) @@ -795,7 +817,8 @@ private Map getProperties(FlinkApplication application, String r properties.put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), true); } - Map dynamicProperties = PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties); + Map dynamicProperties = + PropertiesUtils.extractDynamicPropertiesAsJava(runtimeProperties); properties.putAll(dynamicProperties); ResolveOrder resolveOrder = ResolveOrder.of(application.getResolveOrder()); if (resolveOrder != null) { @@ -823,8 +846,8 @@ private void doAbort(Long id) { // kill application if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) { try { - List applications = applicationInfoService - .getYarnAppReport(application.getJobName()); + List applications = + applicationInfoService.getYarnAppReport(application.getJobName()); if (!applications.isEmpty()) { YarnClient yarnClient = HadoopUtils.yarnClient(); yarnClient.killApplication(applications.get(0).getApplicationId()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseServiceImpl.java new file mode 100644 index 0000000000..831d85c248 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/DatabaseServiceImpl.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.console.base.exception.AlertException; +import org.apache.streampark.console.core.bean.DatabaseParam; +import org.apache.streampark.console.core.entity.Database; +import org.apache.streampark.console.core.entity.FlinkCatalog; +import org.apache.streampark.console.core.mapper.DatabaseMapper; +import org.apache.streampark.console.core.service.DatabaseService; +import org.apache.streampark.console.core.service.FlinkCatalogBase; +import org.apache.streampark.console.core.service.FlinkCatalogService; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; + +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.streampark.console.core.util.CatalogServiceUtils.getOptions; + +@Service +@Slf4j +@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class) +public class DatabaseServiceImpl extends ServiceImpl + implements + DatabaseService { + + @Autowired + private FlinkCatalogService catalogService; + @Autowired + private FlinkCatalogBase flinkCatalogBase; + + @Override + public boolean databaseExists(DatabaseParam databaseParam) { + AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database."); + return flinkCatalogBase.databaseExists( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseParam.getName()); + } + + @Override + public boolean createDatabase(DatabaseParam databaseParam) { + AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database."); + Map dbMap = new ConcurrentHashMap<>(); + dbMap.put("cascade", String.valueOf(databaseParam.isCascade())); + CatalogDatabase catalogDatabase = + new CatalogDatabaseImpl(dbMap, databaseParam.getDescription()); + return flinkCatalogBase.createDatabase( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseParam.getName(), + catalogDatabase, + databaseParam.isIgnoreIfExits()); + } + + @Override + public List listDatabases(Long catalogId) { + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogId); + AlertException.throwIfNull( + flinkCatalog, "The catalog can't be null. get catalog from database failed."); + List databases = + flinkCatalogBase.listDatabases( + flinkCatalog.getCatalogName(), getOptions(flinkCatalog.getConfiguration())); + if (databases == null || databases.isEmpty()) { + return Collections.emptyList(); + } + List databaseList = new ArrayList<>(); + databases.forEach( + dbName -> { + DatabaseParam dbParam = new DatabaseParam(); + dbParam.setCatalogId(catalogId); + dbParam.setCatalogName(flinkCatalog.getCatalogName()); + dbParam.setName(dbName); + databaseList.add(dbParam); + }); + return databaseList; + } + + @Override + public boolean dropDatabase(DatabaseParam databaseParam) { + AlertException.throwIfNull(databaseParam.getName(), "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(databaseParam.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit in database."); + flinkCatalogBase.dropDatabase( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseParam.getName(), + databaseParam.isCascade(), + databaseParam.isIgnoreIfExits()); + return true; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java index b173cb276d..f316d54787 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkCatalogServiceImpl.java @@ -93,7 +93,17 @@ record -> { } @Override - public boolean update(FlinkCatalogParams catalogParam, long userId) { + public FlinkCatalog getCatalog(Long catalogId) { + return this.baseMapper.selectById(catalogId); + } + + @Override + public FlinkCatalog getCatalog(String catalogName) { + return this.baseMapper.selectByCatalogName(catalogName); + } + + @Override + public boolean update(FlinkCatalogParams catalogParam, Long userId) { AlertException.throwIfNull( catalogParam.getTeamId(), "The teamId can't be null. List catalog failed."); FlinkCatalog catalog = getById(catalogParam.getId()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TableServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TableServiceImpl.java new file mode 100644 index 0000000000..1e9a96da11 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/TableServiceImpl.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.service.impl; + +import org.apache.streampark.console.base.exception.AlertException; +import org.apache.streampark.console.core.bean.TableColumn; +import org.apache.streampark.console.core.bean.TableParams; +import org.apache.streampark.console.core.entity.FlinkCatalog; +import org.apache.streampark.console.core.service.FlinkCatalogBase; +import org.apache.streampark.console.core.service.FlinkCatalogService; +import org.apache.streampark.console.core.service.TableService; +import org.apache.streampark.console.core.util.DataTypeConverterUtils; + +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UniqueConstraint; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.streampark.console.core.util.CatalogServiceUtils.getOptions; + +@Service +@Slf4j +@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class) +public class TableServiceImpl implements TableService { + + @Autowired + private FlinkCatalogService catalogService; + @Autowired + private FlinkCatalogBase flinkCatalogBase; + + @Override + public boolean tableExists(TableParams tableParams) { + AlertException.throwIfNull(tableParams.getName(), "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + return flinkCatalogBase.tableExists( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName()); + } + + @Override + public boolean createTable(TableParams tableParams) { + AlertException.throwIfNull(tableParams.getName(), "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(tableParams.getTableColumns(), "Table column can not be null."); + AlertException.throwIfNull(tableParams.getTableOptions(), "Table options can not be null."); + List columns = new ArrayList<>(); + List ukColumns = new ArrayList<>(); + AtomicReference ukName = new AtomicReference<>("uk"); + tableParams + .getTableColumns() + .forEach( + tc -> { + columns.add( + Column.physical( + tc.getField(), DataTypeConverterUtils.convertToDataType(tc.getDataType()))); + if (tc.isPk()) { + ukColumns.add(tc.getField()); + ukName.set(ukName + tc.getField()); + } + }); + final Schema schema = + Schema.newBuilder() + .fromResolvedSchema( + new ResolvedSchema( + columns, + Collections.emptyList(), + UniqueConstraint.primaryKey(ukName.get(), ukColumns))) + .build(); + final CatalogTable originTable = + CatalogTable.of( + schema, + tableParams.getDescription(), + tableParams.getPartitionKey(), + tableParams.getTableOptions()); + return flinkCatalogBase.createTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName(), + originTable, + true); + } + + @Override + public boolean addColumn(TableParams tableParams) { + AlertException.throwIfNull(tableParams.getName(), "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(tableParams.getTableColumns(), "Table column can not be null."); + + List tableChanges = new ArrayList<>(); + for (TableColumn tableColumn : tableParams.getTableColumns()) { + Column column = + Column.physical( + tableColumn.getField(), + DataTypeConverterUtils.convertToDataType(tableColumn.getDataType())) + .withComment(tableColumn.getComment()); + TableChange.AddColumn addColumn = TableChange.add(column); + tableChanges.add(addColumn); + } + + return flinkCatalogBase.alterTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName(), + tableChanges, + true); + } + + @Override + public boolean dropColumn( + String catalogName, String databaseName, String tableName, String columnName) { + AlertException.throwIfNull(tableName, "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(columnName, "Table column name can not be null."); + + List tableChanges = new ArrayList<>(); + TableChange.DropColumn dropColumn = TableChange.dropColumn(columnName); + tableChanges.add(dropColumn); + return flinkCatalogBase.alterTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseName, + tableName, + tableChanges, + true); + } + + @Override + public boolean addOption(TableParams tableParams) { + + AlertException.throwIfNull(tableParams.getName(), "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(tableParams.getTableOptions(), "Table options can not be null."); + List tableChanges = new ArrayList<>(); + tableParams + .getTableOptions() + .forEach( + (key, value) -> { + tableChanges.add(TableChange.set(key, value)); + }); + + return flinkCatalogBase.alterTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName(), + tableChanges, + true); + } + + @Override + public boolean removeOption( + String catalogName, String databaseName, String tableName, String key) { + AlertException.throwIfNull(tableName, "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + AlertException.throwIfNull(key, "Table options key can not be null."); + List tableChanges = new ArrayList<>(); + + tableChanges.add(new TableChange.ResetOption(key)); + + return flinkCatalogBase.alterTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + databaseName, + tableName, + tableChanges, + true); + } + + @Override + public boolean dropTable(String catalogName, String databaseName, String tableName) { + AlertException.throwIfNull(tableName, "Table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + return flinkCatalogBase.dropTable( + catalogName, getOptions(flinkCatalog.getConfiguration()), databaseName, tableName, true); + } + + @Override + public boolean renameTable( + String catalogName, String databaseName, String fromTableName, String toTableName) { + AlertException.throwIfNull(fromTableName, "From table name can not be null."); + AlertException.throwIfNull(toTableName, "To table name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + return flinkCatalogBase.renameTable( + catalogName, + getOptions(flinkCatalog.getConfiguration()), + databaseName, + fromTableName, + toTableName); + } + + @Override + public List listTables(TableParams tableParams) { + AlertException.throwIfNull(tableParams.getDatabaseName(), "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(tableParams.getCatalogId()); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + List tables = + flinkCatalogBase.listTable( + tableParams.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName()); + + if (tables == null || tables.isEmpty()) { + return null; + } + List tableParamsList = new ArrayList<>(); + tables.forEach( + tableName -> { + tableParamsList.add( + covertToTableParams( + flinkCatalogBase.getTable( + flinkCatalog.getCatalogName(), + getOptions(flinkCatalog.getConfiguration()), + tableParams.getDatabaseName(), + tableParams.getName()))); + }); + + return tableParamsList; + } + + @Override + public TableParams listColumns(String catalogName, String databaseName, String tableName) { + AlertException.throwIfNull(databaseName, "Database name can not be null."); + FlinkCatalog flinkCatalog = catalogService.getCatalog(catalogName); + AlertException.throwIfNull(flinkCatalog, "Catalog is not exit."); + + CatalogBaseTable originTable = + flinkCatalogBase.getTable( + catalogName, getOptions(flinkCatalog.getConfiguration()), databaseName, tableName); + TableParams tableParams = covertToTableParams(originTable); + tableParams.setName(tableName); + tableParams.setCatalogName(catalogName); + tableParams.setDatabaseName(catalogName); + tableParams.setCatalogId(flinkCatalog.getId()); + return tableParams; + } + + private TableParams covertToTableParams(CatalogBaseTable catalogBaseTable) { + List tableColumns = new ArrayList<>(); + catalogBaseTable + .getUnresolvedSchema() + .getColumns() + .forEach( + unresolvedColumn -> { + TableColumn tableColumn = new TableColumn(); + tableColumn.setField(unresolvedColumn.getName()); + unresolvedColumn.getComment().ifPresent(tableColumn::setComment); + tableColumns.add(tableColumn); + }); + catalogBaseTable + .getUnresolvedSchema() + .getPrimaryKey() + .ifPresent( + unresolvedPrimaryKey -> { + List primaryKeys = unresolvedPrimaryKey.getColumnNames(); + for (String primary : primaryKeys) { + for (TableColumn column : tableColumns) { + if (column.getField().equals(primary)) { + column.setPk(true); + } + } + } + }); + return TableParams.builder().tableColumns(tableColumns).build(); + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java new file mode 100644 index 0000000000..fe0ef3b4bf --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/CatalogServiceUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.util; + +import org.apache.streampark.console.base.util.JacksonUtils; +import org.apache.streampark.console.base.util.WebUtils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.factories.FactoryUtil; + +import com.fasterxml.jackson.core.JsonProcessingException; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +public class CatalogServiceUtils { + + private static final Pattern PATTERN_FLINK_CONNECTOR_PLUGIN = + Pattern.compile( + "^streampark-flink-connector-plugin-(\\d+)\\.(\\d+)\\.(\\d+)(?:-([^-]+))?\\.jar$", + Pattern.CASE_INSENSITIVE | Pattern.DOTALL); + + public static Catalog getCatalog( + String catalogName, Map options, + Map configurations) { + ClassLoader classLoader = getCatalogClassLoader(Thread.currentThread().getContextClassLoader()); + return FactoryUtil.createCatalog( + catalogName, options, Configuration.fromMap(configurations), classLoader); + } + + /** get catalog classloader, add streampark-flink-connector-plugins */ + protected static ClassLoader getCatalogClassLoader(ClassLoader classLoader) { + File pluginDir = WebUtils.getPluginDir(); + File[] pluginFiles = + pluginDir.listFiles( + pathname -> pathname.getName().matches(PATTERN_FLINK_CONNECTOR_PLUGIN.pattern())); + if (pluginFiles == null) { + return classLoader; + } + List pluginUrls = new ArrayList(); + for (File file : pluginFiles) { + try { + URL pluginUrl = file.toURI().toURL(); + pluginUrls.add(pluginUrl); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + return new URLClassLoader(pluginUrls.toArray(new URL[0]), classLoader); + } + + public static Map getOptions(String configuration) { + try { + return JacksonUtils.toMap(configuration); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java new file mode 100644 index 0000000000..b8b6b677bc --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.util; + +import org.apache.streampark.console.core.bean.FlinkDataType; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; + +public class DataTypeConverterUtils { + + public static DataType convertToDataType(FlinkDataType flinkDataType) { + DataType dataType; + + String type = flinkDataType.getType().toLowerCase(); + boolean isNullable = flinkDataType.isNullable(); + Integer precision = flinkDataType.getPrecision(); + Integer scale = flinkDataType.getScale(); + + switch (type) { + // Integer types + case "tinyint": + case "int1": + dataType = DataTypes.TINYINT(); + break; + case "smallint": + case "int2": + dataType = DataTypes.SMALLINT(); + break; + case "int": + case "integer": + case "int4": + dataType = DataTypes.INT(); + break; + case "bigint": + case "int8": + dataType = DataTypes.BIGINT(); + break; + + // Floating-point types + case "float": + case "real": + dataType = DataTypes.FLOAT(); + break; + case "double": + case "float8": + dataType = DataTypes.DOUBLE(); + break; + + // Decimal and Numeric types + case "decimal": + case "numeric": + if (precision != null && scale != null) { + dataType = DataTypes.DECIMAL(precision, scale); + } else { + dataType = DataTypes.DECIMAL(38, 18); // Default precision and scale + } + break; + + // Character types + case "char": + if (precision != null) { + dataType = DataTypes.CHAR(precision); + } else { + dataType = DataTypes.CHAR(1); // Default size + } + break; + case "varchar": + case "string": + case "text": + dataType = DataTypes.STRING(); + break; + + // Binary data types + case "binary": + case "varbinary": + case "blob": + dataType = DataTypes.BYTES(); + break; + + // Date and time types + case "date": + dataType = DataTypes.DATE(); + break; + case "timestamp": + if (precision != null) { + dataType = DataTypes.TIMESTAMP(precision); + } else { + dataType = DataTypes.TIMESTAMP(3); // Default precision + } + break; + case "time": + dataType = DataTypes.TIME(); + break; + + // Boolean type + case "boolean": + case "bool": + dataType = DataTypes.BOOLEAN(); + break; + + // JSON and other types + case "json": + dataType = DataTypes.STRING(); // JSON as STRING in Flink + break; + case "uuid": + dataType = DataTypes.STRING(); // UUID as STRING + break; + + // Default case for unsupported types + default: + throw new IllegalArgumentException("Unsupported type: " + type); + } + + // Apply nullability + return isNullable ? dataType.nullable() : dataType.notNull(); + } + + public static FlinkDataType convertorToFlinkDataType(DataType dataType) { + LogicalType logicalType = dataType.getLogicalType(); + boolean isNullable = dataType.getLogicalType().isNullable(); + String typeName = logicalType.getTypeRoot().name().toLowerCase(); + Integer precision = null; + Integer scale = null; + + switch (logicalType.getTypeRoot()) { + case CHAR: + case VARCHAR: + if (logicalType instanceof CharType) { + precision = ((CharType) logicalType).getLength(); + } else if (logicalType instanceof VarCharType) { + precision = ((VarCharType) logicalType).getLength(); + } + break; + case DECIMAL: + if (logicalType instanceof DecimalType) { + precision = ((DecimalType) logicalType).getPrecision(); + scale = ((DecimalType) logicalType).getScale(); + } + break; + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case BOOLEAN: + case BINARY: + case VARBINARY: + case ARRAY: + case MULTISET: + case MAP: + case ROW: + case RAW: + case NULL: + case SYMBOL: + case UNRESOLVED: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + case TIMESTAMP_WITH_TIME_ZONE: + case DISTINCT_TYPE: + case STRUCTURED_TYPE: + // case JSON: + // case UUID: + // These types do not have precision or scale + break; + default: + throw new IllegalArgumentException("Unsupported type: " + logicalType); + } + + return new FlinkDataType(typeName, isNullable, precision, scale); + } +} diff --git a/streampark-console/streampark-console-service/src/main/resources/config.yaml b/streampark-console/streampark-console-service/src/main/resources/config.yaml index 3279c71007..7acf0e77aa 100644 --- a/streampark-console/streampark-console-service/src/main/resources/config.yaml +++ b/streampark-console/streampark-console-service/src/main/resources/config.yaml @@ -100,6 +100,25 @@ sso: # Optional, change by authentication client # Please replace and fill in your client config below when enabled SSO +## flink catalog store config +table: + catalog-store: + kind: jdbc + jdbc: + url: jdbc://mysql:127.0.0.1:3306/flink-test + # The JDBC database url. + table-name: t_flink_catalog + ## catalog store table + driver: com.mysql.cj.jdbc.Driver + # The class name of the JDBC driver to use to connect to this URL, if not set, it will automatically be derived from the URL. + username: flinkuser + # The JDBC user name. 'username' and 'password' must both be specified if any of them is specified. + password: flinkpw + # The JDBC password. + max-retry-timeout: 600 + + + registry: type: jdbc heartbeat-refresh-interval: 1s diff --git a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql index 3c082209ad..5e5bb79c0d 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql @@ -187,6 +187,24 @@ insert into `t_menu` values (150502, 150500, 'member add', null, null, 'member:a insert into `t_menu` values (150503, 150500, 'member update', null, null, 'member:update', null, '1', 1, null, now(), now()); insert into `t_menu` values (150504, 150500, 'member delete', null, null, 'member:delete', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150601, 150600, 'catalog view', null, null, 'catalog:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150602, 150600, 'catalog create', null, null, 'catalog:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150603, 150600, 'catalog update', null, null, 'catalog:update', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150604, 150600, 'catalog delete', null, null, 'catalog:delete', null, '1', 1, null, now(), now()); + +insert into `t_menu` values (150605, 150600, 'database view', null, null, 'database:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150606, 150600, 'database create', null, null, 'database:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150607, 150600, 'database delete', null, null, 'database:delete', null, '1', 1, null, now(), now()); + +insert into `t_menu` values (150608, 150600, 'table view', null, null, 'table:view', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150609, 150600, 'table create', null, null, 'table:create', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150610, 150600, 'table update', null, null, 'table:update', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150611, 150600, 'table view', null, null, 'table:column:add', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150612, 150600, 'table column list', null, null, 'table:column:list', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150613, 150600, 'table column drop', null, null, 'table:column:drop', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150614, 150600, 'table option add', null, null, 'option:add', null, '1', 1, null, now(), now()); +insert into `t_menu` values (150615, 150600, 'table option remove', null, null, 'option:remove', null, '1', 1, null, now(), now()); + -- ---------------------------- -- Records of t_role -- ---------------------------- @@ -307,6 +325,24 @@ insert into `t_role_menu` values (100107, 100002, 150501); insert into `t_role_menu` values (100108, 100002, 150502); insert into `t_role_menu` values (100109, 100002, 150503); insert into `t_role_menu` values (100110, 100002, 150504); +insert into `t_role_menu` values (100111, 100002, 150601); +insert into `t_role_menu` values (100112, 100002, 150602); +insert into `t_role_menu` values (100113, 100002, 150603); +insert into `t_role_menu` values (100114, 100002, 150604); +insert into `t_role_menu` values (100115, 100002, 150605); +insert into `t_role_menu` values (100116, 100002, 150606); +insert into `t_role_menu` values (100117, 100002, 150607); +insert into `t_role_menu` values (100118, 100002, 150608); +insert into `t_role_menu` values (100119, 100002, 150609); +insert into `t_role_menu` values (100120, 100002, 150610); +insert into `t_role_menu` values (100121, 100002, 150611); +insert into `t_role_menu` values (100122, 100002, 150612); +insert into `t_role_menu` values (100123, 100002, 150613); +insert into `t_role_menu` values (100124, 100002, 150614); +insert into `t_role_menu` values (100125, 100002, 150615); +insert into `t_role_menu` values (100126, 100002, 150600); +insert into `t_role_menu` values (100127, 100001, 150600); + -- ---------------------------- -- Records of t_setting diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkCatalogMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkCatalogMapper.xml index e0d291ce34..d5de6876ff 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkCatalogMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkCatalogMapper.xml @@ -27,6 +27,14 @@ limit 1 + +