diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java index f08b50a0c42f38..7bb4b61344a91b 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java @@ -78,6 +78,10 @@ public void remove(K key) { ttlMap.remove(key); } + public int size() { + return map.size(); + } + public void shutdown() { scheduler.shutdown(); try { diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java index c6aa2ece0576be..3f3860ad53dce0 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java @@ -72,7 +72,9 @@ public UdafExecutor(byte[] thriftParams) throws Exception { */ @Override public void close() { - super.close(); + if (!isStaticLoad) { + super.close(); + } stateObjMap = null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java index 4a38d06ffe28ff..a0c74c8bd3f4a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java @@ -360,6 +360,8 @@ private void analyzeTableFunction() throws AnalysisException { location, symbol, null, null); function.setChecksum(checksum); function.setNullableMode(returnNullMode); + function.setStaticLoad(isStaticLoad); + function.setExpirationTime(expirationTime); function.setUDTFunction(true); // Todo: maybe in create tables function, need register two function, one is // normal and one is outer as those have different result when result is NULL. @@ -426,6 +428,8 @@ private void analyzeUda() throws AnalysisException { function.setBinaryType(binaryType); function.setChecksum(checksum); function.setNullableMode(returnNullMode); + function.setStaticLoad(isStaticLoad); + function.setExpirationTime(expirationTime); } private void analyzeUdf() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java index 27cf22b73b28dc..85143d2aca65bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java @@ -62,6 +62,8 @@ public class JavaUdaf extends AggregateFunction implements ExplicitlyCastableSig private final String getValueFn; private final String removeFn; private final String checkSum; + private final boolean isStaticLoad; + private final long expirationTime; /** * Constructor of UDAF @@ -72,7 +74,7 @@ public JavaUdaf(String name, long functionId, String dbName, TFunctionBinaryType String objectFile, String symbol, String initFn, String updateFn, String mergeFn, String serializeFn, String finalizeFn, String getValueFn, String removeFn, - boolean isDistinct, String checkSum, Expression... args) { + boolean isDistinct, String checkSum, boolean isStaticLoad, long expirationTime, Expression... args) { super(name, isDistinct, args); this.dbName = dbName; this.functionId = functionId; @@ -90,6 +92,8 @@ public JavaUdaf(String name, long functionId, String dbName, TFunctionBinaryType this.getValueFn = getValueFn; this.removeFn = removeFn; this.checkSum = checkSum; + this.isStaticLoad = isStaticLoad; + this.expirationTime = expirationTime; } @Override @@ -120,7 +124,7 @@ public JavaUdaf withDistinctAndChildren(boolean isDistinct, List chi Preconditions.checkArgument(children.size() == this.children.size()); return new JavaUdaf(getName(), functionId, dbName, binaryType, signature, intermediateType, nullableMode, objectFile, symbol, initFn, updateFn, mergeFn, serializeFn, finalizeFn, getValueFn, removeFn, - isDistinct, checkSum, children.toArray(new Expression[0])); + isDistinct, checkSum, isStaticLoad, expirationTime, children.toArray(new Expression[0])); } /** @@ -162,6 +166,8 @@ public static void translateToNereidsFunction(String dbName, org.apache.doris.ca aggregate.getRemoveFnSymbol(), false, aggregate.getChecksum(), + aggregate.isStaticLoad(), + aggregate.getExpirationTime(), virtualSlots); JavaUdafBuilder builder = new JavaUdafBuilder(udaf); @@ -196,6 +202,8 @@ public Function getCatalogFunction() { expr.setNullableMode(nullableMode); expr.setChecksum(checkSum); expr.setId(functionId); + expr.setStaticLoad(isStaticLoad); + expr.setExpirationTime(expirationTime); return expr; } catch (Exception e) { throw new AnalysisException(e.getMessage(), e.getCause()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java index 96ea02bf2b7215..c90a8c343a3c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java @@ -56,6 +56,8 @@ public class JavaUdtf extends TableGeneratingFunction implements ExplicitlyCasta private final String prepareFn; private final String closeFn; private final String checkSum; + private final boolean isStaticLoad; + private final long expirationTime; /** * Constructor of UDTF @@ -63,7 +65,7 @@ public class JavaUdtf extends TableGeneratingFunction implements ExplicitlyCasta public JavaUdtf(String name, long functionId, String dbName, TFunctionBinaryType binaryType, FunctionSignature signature, NullableMode nullableMode, String objectFile, String symbol, String prepareFn, String closeFn, - String checkSum, Expression... args) { + String checkSum, boolean isStaticLoad, long expirationTime, Expression... args) { super(name, args); this.dbName = dbName; this.functionId = functionId; @@ -75,6 +77,8 @@ public JavaUdtf(String name, long functionId, String dbName, TFunctionBinaryType this.prepareFn = prepareFn; this.closeFn = closeFn; this.checkSum = checkSum; + this.isStaticLoad = isStaticLoad; + this.expirationTime = expirationTime; } /** @@ -84,7 +88,8 @@ public JavaUdtf(String name, long functionId, String dbName, TFunctionBinaryType public JavaUdtf withChildren(List children) { Preconditions.checkArgument(children.size() == this.children.size()); return new JavaUdtf(getName(), functionId, dbName, binaryType, signature, nullableMode, - objectFile, symbol, prepareFn, closeFn, checkSum, children.toArray(new Expression[0])); + objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad, expirationTime, + children.toArray(new Expression[0])); } @Override @@ -119,6 +124,8 @@ public Function getCatalogFunction() { expr.setNullableMode(nullableMode); expr.setChecksum(checkSum); expr.setId(functionId); + expr.setStaticLoad(isStaticLoad); + expr.setExpirationTime(expirationTime); expr.setUDTFunction(true); return expr; } catch (Exception e) { @@ -153,6 +160,8 @@ public static void translateToNereidsFunction(String dbName, org.apache.doris.ca scalar.getPrepareFnSymbol(), scalar.getCloseFnSymbol(), scalar.getChecksum(), + scalar.isStaticLoad(), + scalar.getExpirationTime(), virtualSlots); JavaUdtfBuilder builder = new JavaUdtfBuilder(udf); diff --git a/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out index 9f57f52d091d3e..097f84fd7a07e4 100644 --- a/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out +++ b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out @@ -25,3 +25,43 @@ -- !select5 -- 5 +-- !select6 -- +1 + +-- !select7 -- +2 + +-- !select8 -- +3 + +-- !select9 -- +4 + +-- !select10 -- +5 + +-- !select11 -- +1 1 + +-- !select12 -- +1 2 +1 2 + +-- !select13 -- +1 3 +1 3 +1 3 + +-- !select14 -- +1 4 +1 4 +1 4 +1 4 + +-- !select15 -- +1 5 +1 5 +1 5 +1 5 +1 5 + diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java new file mode 100644 index 00000000000000..dde6c1336d1c78 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class StaticIntTestUDAF { + static { + System.out.println("static load should only print once StaticIntTestUDAF"); + } + private static int value = 0; + public static class State { + public long counter = 0; + } + + public State create() { + return new State(); + } + + public void destroy(State state) { + } + + public void reset(State state) { + state.counter = 0; + } + + public void add(State state, Integer val) { + if (val == null) return; + state.counter += val; + } + + public void serialize(State state, DataOutputStream out) throws IOException { + out.writeLong(state.counter); + } + + public void deserialize(State state, DataInputStream in) throws IOException { + state.counter = in.readLong(); + } + + public void merge(State state, State rhs) { + state.counter += rhs.counter; + } + + public long getValue(State state) { + value = value + 1; + System.out.println("getValue StaticIntTestUDAF " + value + " " + state.counter); + return state.counter + value; + } +} diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java new file mode 100644 index 00000000000000..460a4ac809d4d2 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import java.util.ArrayList; + +public class StaticIntTestUDTF { + static { + System.out.println("static load should only print once StaticIntTestUDTF"); + } + private static int value = 0; + public ArrayList evaluate() { + ArrayList result = new ArrayList<>(); + value = value + 1; + for (int i = 0; i < value; i++) { + result.add(value); + } + return result; + } +} diff --git a/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy index c816ec90292a79..1a98767fb51cdf 100644 --- a/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy +++ b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy @@ -66,6 +66,7 @@ suite("test_javaudf_static_load_test") { "type"="JAVA_UDF" ); """ + // the result of the following queries should be the accumulation sql """set parallel_pipeline_task_num = 1; """ qt_select1 """ SELECT static_load_test(); """ qt_select2 """ SELECT static_load_test(); """ @@ -73,8 +74,47 @@ suite("test_javaudf_static_load_test") { qt_select4 """ SELECT static_load_test(); """ qt_select5 """ SELECT static_load_test(); """ + + sql """ CREATE AGGREGATE FUNCTION static_load_test_udaf(int) RETURNS BigInt PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.StaticIntTestUDAF", + "always_nullable"="true", + "static_load"="true", + "expiration_time"="10", + "type"="JAVA_UDF" + ); """ + + // the result of the following queries should be the accumulation + // maybe we need drop funtion and test again, the result should be the same + // but the regression test will copy the jar to be custom_lib, and loaded by BE when it's started + // so it's can't be unloaded + sql """set parallel_pipeline_task_num = 1; """ + qt_select6 """ SELECT static_load_test_udaf(0); """ + qt_select7 """ SELECT static_load_test_udaf(0); """ + qt_select8 """ SELECT static_load_test_udaf(0); """ + qt_select9 """ SELECT static_load_test_udaf(0); """ + qt_select10 """ SELECT static_load_test_udaf(0); """ + + sql """ CREATE TABLES FUNCTION static_load_test_udtf() RETURNS array PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.StaticIntTestUDTF", + "always_nullable"="true", + "static_load"="true", + "expiration_time"="10", + "type"="JAVA_UDF" + ); """ + + sql """set parallel_pipeline_task_num = 1; """ + qt_select11 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + qt_select12 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + qt_select13 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + qt_select14 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + qt_select15 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + } finally { try_sql("DROP FUNCTION IF EXISTS static_load_test();") + try_sql("DROP FUNCTION IF EXISTS static_load_test_udaf(int);") + try_sql("DROP FUNCTION IF EXISTS static_load_test_udtf();") try_sql("DROP TABLE IF EXISTS ${tableName}") } }