From 7a5d51629aa9834f79d32df8a99453901a340030 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Thu, 31 Oct 2024 21:20:34 +0800 Subject: [PATCH 1/5] [improve](udf) support load data with udf functions --- .../java/org/apache/doris/common/Config.java | 7 + .../doris/analysis/FunctionCallExpr.java | 7 +- .../doris/load/loadv2/LoadingTaskPlanner.java | 2 +- .../doris/planner/StreamLoadPlanner.java | 2 +- .../test_routine_load_with_udf.out | 6 + .../stream_load/test_stream_load_udf.csv | 3 + .../stream_load/test_stream_load_with_udf.out | 6 + .../org/apache/doris/udf/IntLoadTest.java | 24 ++++ .../org/apache/doris/udf/StringLoadTest.java | 27 ++++ .../test_routine_load_with_udf.groovy | 126 ++++++++++++++++++ .../test_stream_load_with_udf.groovy | 65 +++++++++ 11 files changed, 272 insertions(+), 3 deletions(-) create mode 100644 regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out create mode 100644 regression-test/data/load_p0/stream_load/test_stream_load_udf.csv create mode 100644 regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out create mode 100644 regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java create mode 100644 regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java create mode 100644 regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy create mode 100644 regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index dd0aca5923e74a..be9ca1d19cab9b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2622,6 +2622,13 @@ public class Config extends ConfigBase { }) public static boolean enable_java_udf = true; + @ConfField(mutable = true, masterOnly = true, description = { + "开启后,可以在导入时,利用创建的全局java_udf函数处理数据, 默认为false。", + "When enabled, data can be processed using the globally created java_udf function during import." + + " The default setting is false." + }) + public static boolean enable_udf_in_load = false; + @ConfField(description = { "是否忽略 Image 文件中未知的模块。如果为 true,不在 PersistMetaModules.MODULE_NAMES 中的元数据模块将被忽略并跳过。" + "默认为 false,如果 Image 文件中包含未知的模块,Doris 将会抛出异常。" diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java index 2a8ae667fdefe0..53bb2ba95acb25 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/FunctionCallExpr.java @@ -37,6 +37,7 @@ import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.datasource.InternalCatalog; @@ -2456,7 +2457,11 @@ public Function findUdf(FunctionName fnName, Analyzer analyzer) throws AnalysisE } Function fn = null; - String dbName = fnName.analyzeDb(analyzer); + String dbName = null; + // when enable_udf_in_load == true, and db is null, maybe it's load, should find global function + if (!(Config.enable_udf_in_load && fnName.getDb() == null)) { + dbName = fnName.analyzeDb(analyzer); + } if (!Strings.isNullOrEmpty(dbName)) { // check operation privilege if (!analyzer.isReplay() && !Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 3544aeda2b867a..ef429a1d564208 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -113,7 +113,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table PrivPredicate.SELECT)) { this.analyzer.setUDFAllowed(true); } else { - this.analyzer.setUDFAllowed(false); + this.analyzer.setUDFAllowed(Config.enable_udf_in_load); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index e1a95531989258..3bb1f0018fbdd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -101,7 +101,7 @@ private void resetAnalyzer() { analyzer = new Analyzer(Env.getCurrentEnv(), null); // TODO(cmy): currently we do not support UDF in stream load command. // Because there is no way to check the privilege of accessing UDF.. - analyzer.setUDFAllowed(false); + analyzer.setUDFAllowed(Config.enable_udf_in_load); descTable = analyzer.getDescTbl(); } diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out b/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out new file mode 100644 index 00000000000000..ec68fa52ff42d0 --- /dev/null +++ b/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_topic_udf -- +1 eab 2023-07-15 def 2023-07-20T05:48:31 defdoris udf load +1 eab 2023-07-15 def 2023-07-20T05:48:31 defdoris udf load +1 eab 2023-07-15 def 2023-07-20T05:48:31 defdoris udf load + diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv b/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv new file mode 100644 index 00000000000000..619df81596dddb --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_udf.csv @@ -0,0 +1,3 @@ +1,asd +2,xxx +3,\N diff --git a/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out b/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out new file mode 100644 index 00000000000000..898c2cb4a9e4ce --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_stream_load_with_udf.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1001 asd +1002 xxx +1003 \N + diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java new file mode 100644 index 00000000000000..7e48719e6d3a23 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/IntLoadTest.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +public class IntLoadTest { + public Integer evaluate(Integer value) { + return value == null? null: value + 1000; + } +} diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java new file mode 100644 index 00000000000000..008acb720a7898 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StringLoadTest.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +public class StringLoadTest { + public String evaluate(String str) { + if (str == null) { + return null; + } + return str + "doris udf load"; + } +} diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy new file mode 100644 index 00000000000000..5d577ee3d0d73d --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_with_udf","p0") { + def kafkaCsvTpoics = [ + "test_show_routine_load", + ] + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // define kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // Create kafka producer + def producer = new KafkaProducer<>(props) + + for (String kafkaCsvTopic in kafkaCsvTpoics) { + def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text + def lines = txt.readLines() + lines.each { line -> + logger.info("=====${line}========") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + } + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def tableName = "test_routine_load_with_udf" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + def jarPath = """${context.file.parent}/../../javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar""" + scp_udf_file_to_all_be(jarPath) + log.info("Jar path: ${jarPath}".toString()) + + sql """ ADMIN SET FRONTEND CONFIG ("enable_udf_in_load" = "true"); """ + try_sql("DROP GLOBAL FUNCTION IF EXISTS java_udf_string_load_global(string);") + sql """ CREATE GLOBAL FUNCTION java_udf_string_load_global(string) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.StringLoadTest", + "type"="JAVA_UDF" + ); """ + + try { + sql """ + CREATE ROUTINE LOAD test_udf_load ON ${tableName} + COLUMNS TERMINATED BY ",", + COLUMNS(k1, k2, v1, v2, v3, tmp, v4=java_udf_string_load_global(v2)) + PROPERTIES + ( + "max_batch_interval" = "5", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + sql "sync" + + String db = context.config.getDbNameByFile(context.file) + log.info("reason of state changed: ${db}".toString()) + + def count = 0 + while (true) { + res = sql "select count(*) from ${tableName}" + def state = sql "show routine load for testShow1" + log.info("routine load state: ${state[0][8].toString()}".toString()) + log.info("routine load statistic: ${state[0][14].toString()}".toString()) + log.info("reason of state changed: ${state[0][17].toString()}".toString()) + if (res[0][0] > 0) { + break + } + if (count >= 120) { + log.error("routine load can not visible for long time") + assertEquals(20, res[0][0]) + break + } + sleep(5000) + count++ + } + qt_sql_topic_udf "select * from ${tableName} order by k1" + } finally { + sql "stop routine load for test_udf_load" + } + } +} diff --git a/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy b/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy new file mode 100644 index 00000000000000..657a298991742c --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_stream_load_with_udf.groovy @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_stream_load_with_udf", "p0") { + def tableName = "test_stream_load_with_udf" + sql """ set enable_fallback_to_original_planner=false;""" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + id int, + v1 string + ) ENGINE=OLAP + duplicate key (`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + def jarPath = """${context.file.parent}/../../javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar""" + scp_udf_file_to_all_be(jarPath) + log.info("Jar path: ${jarPath}".toString()) + + sql """ ADMIN SET FRONTEND CONFIG ("enable_udf_in_load" = "true"); """ + try_sql("DROP GLOBAL FUNCTION IF EXISTS java_udf_int_load_global(int);") + sql """ CREATE GLOBAL FUNCTION java_udf_int_load_global(int) RETURNS int PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.IntLoadTest", + "type"="JAVA_UDF" + ); """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', """ tmp,v1, id=java_udf_int_load_global(tmp) """ + file 'test_stream_load_udf.csv' + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(3, json.NumberLoadedRows) + } + } + + sql """sync""" + + qt_sql """select * from ${tableName} order by id;""" + +} From 8f057eacd50617f5939098fb14f90ac3519dc9d1 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Fri, 1 Nov 2024 10:10:35 +0800 Subject: [PATCH 2/5] update --- .../load_p0/routine_load/test_routine_load_with_udf.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy index 5d577ee3d0d73d..739f45b52f4b51 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy @@ -103,7 +103,7 @@ suite("test_routine_load_with_udf","p0") { def count = 0 while (true) { res = sql "select count(*) from ${tableName}" - def state = sql "show routine load for testShow1" + def state = sql "show routine load for test_routine_load_with_udf" log.info("routine load state: ${state[0][8].toString()}".toString()) log.info("routine load statistic: ${state[0][14].toString()}".toString()) log.info("reason of state changed: ${state[0][17].toString()}".toString()) From fe1975257623490998a08f400fa19943f26b4d3f Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Fri, 1 Nov 2024 17:28:14 +0800 Subject: [PATCH 3/5] update name --- .../load_p0/routine_load/test_routine_load_with_udf.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy index 739f45b52f4b51..70b00bd8126811 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy @@ -103,7 +103,7 @@ suite("test_routine_load_with_udf","p0") { def count = 0 while (true) { res = sql "select count(*) from ${tableName}" - def state = sql "show routine load for test_routine_load_with_udf" + def state = sql "show routine load for test_udf_load" log.info("routine load state: ${state[0][8].toString()}".toString()) log.info("routine load statistic: ${state[0][14].toString()}".toString()) log.info("reason of state changed: ${state[0][17].toString()}".toString()) From 0d6bda7d028c2a0d7d64e0c05b9a41acee67af3a Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Mon, 4 Nov 2024 11:33:06 +0800 Subject: [PATCH 4/5] update2 --- .../suites/load_p0/routine_load/data/test_routine_load_udf.csv | 1 + .../load_p0/routine_load/test_routine_load_with_udf.groovy | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv diff --git a/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv b/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv new file mode 100644 index 00000000000000..b226b99ee4e0e0 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/data/test_routine_load_udf.csv @@ -0,0 +1 @@ +1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi" \ No newline at end of file diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy index 70b00bd8126811..11a562ba9d4660 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_with_udf.groovy @@ -22,7 +22,7 @@ import org.apache.kafka.clients.producer.ProducerConfig suite("test_routine_load_with_udf","p0") { def kafkaCsvTpoics = [ - "test_show_routine_load", + "test_routine_load_udf", ] String enabled = context.config.otherConfigs.get("enableKafkaTest") String kafka_port = context.config.otherConfigs.get("kafka_port") From d7ffac4eabab49bbb6c14827aec1d646dcd6e0b8 Mon Sep 17 00:00:00 2001 From: zhangstar333 <2561612514@qq.com> Date: Mon, 4 Nov 2024 21:52:34 +0800 Subject: [PATCH 5/5] update --- .../data/load_p0/routine_load/test_routine_load_with_udf.out | 2 -- 1 file changed, 2 deletions(-) diff --git a/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out b/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out index ec68fa52ff42d0..027890b8fe8df0 100644 --- a/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out +++ b/regression-test/data/load_p0/routine_load/test_routine_load_with_udf.out @@ -1,6 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql_topic_udf -- 1 eab 2023-07-15 def 2023-07-20T05:48:31 defdoris udf load -1 eab 2023-07-15 def 2023-07-20T05:48:31 defdoris udf load -1 eab 2023-07-15 def 2023-07-20T05:48:31 defdoris udf load