From c7df60c781778fee10747e575de68fc437e005a9 Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Sat, 8 Feb 2025 10:43:39 +0800 Subject: [PATCH 1/5] [improve](udaf)support class cache for java-udaf --- .../doris/common/jni/utils/UdfClassCache.java | 14 +- .../org/apache/doris/udf/BaseExecutor.java | 106 ++++++++-- .../org/apache/doris/udf/UdafExecutor.java | 189 ++++++++---------- .../org/apache/doris/udf/UdfExecutor.java | 137 +++---------- 4 files changed, 202 insertions(+), 244 deletions(-) diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java index 696ef4ed0bb182..1062aa055826d9 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java @@ -20,6 +20,7 @@ import com.esotericsoftware.reflectasm.MethodAccess; import java.lang.reflect.Method; +import java.util.HashMap; /** * This class is used for caching the class of UDF. @@ -28,16 +29,17 @@ public class UdfClassCache { public Class udfClass; // the index of evaluate() method in the class public MethodAccess methodAccess; - public int evaluateIndex; - // the method of evaluate() in udf - public Method method; - // the method of prepare() in udf - public Method prepareMethod; // the argument and return's JavaUdfDataType of evaluate() method. public JavaUdfDataType[] argTypes; - public JavaUdfDataType retType; // the class type of the arguments in evaluate() method public Class[] argClass; // The return type class of evaluate() method + public JavaUdfDataType retType; public Class retClass; + + // all methods in the class for java-udf/ java-udaf + public HashMap allMethods; + // for java-udf index is evaluate method index + // for java-udaf index is add method index + public int methodIndex; } diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java index f97cbb602f1028..0d2160642e7a21 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java @@ -19,9 +19,12 @@ import org.apache.doris.catalog.ArrayType; import org.apache.doris.catalog.Type; +import org.apache.doris.common.classloader.ScannerLoader; import org.apache.doris.common.exception.InternalException; import org.apache.doris.common.exception.UdfRuntimeException; import org.apache.doris.common.jni.utils.JavaUdfDataType; +import org.apache.doris.common.jni.utils.UdfClassCache; +import org.apache.doris.common.jni.utils.UdfUtils; import org.apache.doris.common.jni.vec.ColumnValueConverter; import org.apache.doris.common.jni.vec.VectorTable; import org.apache.doris.thrift.TFunction; @@ -29,12 +32,17 @@ import org.apache.doris.thrift.TPrimitiveType; import com.esotericsoftware.reflectasm.MethodAccess; +import com.google.common.base.Strings; + import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; +import java.io.FileNotFoundException; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.MalformedURLException; import java.net.URLClassLoader; import java.time.LocalDate; import java.time.LocalDateTime; @@ -44,26 +52,17 @@ import java.util.Map.Entry; public abstract class BaseExecutor { - private static final Logger LOG = Logger.getLogger(BaseExecutor.class); - - // Object to deserialize ctor params from BE. protected static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new TBinaryProtocol.Factory(); - + private static final Logger LOG = Logger.getLogger(BaseExecutor.class); protected Object udf; // setup by init() and cleared by close() protected URLClassLoader classLoader; - - // Return and argument types of the function inferred from the udf method - // signature. - // The JavaUdfDataType enum maps it to corresponding primitive type. - protected JavaUdfDataType[] argTypes; - protected JavaUdfDataType retType; - protected Class[] argClass; - protected MethodAccess methodAccess; - protected VectorTable outputTable = null; + protected UdfClassCache objCache; protected TFunction fn; - protected Class retClass; + protected boolean isStaticLoad = false; + protected VectorTable outputTable = null; + String className; /** * Create a UdfExecutor, using parameters from a serialized thrift object. Used @@ -94,17 +93,79 @@ public BaseExecutor(byte[] thriftParams) throws Exception { public String debugString() { StringBuilder res = new StringBuilder(); - for (JavaUdfDataType type : argTypes) { + for (JavaUdfDataType type : objCache.argTypes) { res.append(type.toString()); } - res.append(" return type: ").append(retType.toString()); - res.append(" methodAccess: ").append(methodAccess.toString()); + res.append(" return type: ").append(objCache.retType.toString()); + res.append(" methodAccess: ").append(objCache.methodAccess.toString()); res.append(" fn.toString(): ").append(fn.toString()); return res.toString(); } - protected abstract void init(TJavaUdfExecutorCtorParams request, String jarPath, - Type funcRetType, Type... parameterTypes) throws UdfRuntimeException; + protected void init(TJavaUdfExecutorCtorParams request, String jarPath, + Type funcRetType, Type... parameterTypes) throws UdfRuntimeException { + try { + isStaticLoad = request.getFn().isSetIsStaticLoad() && request.getFn().is_static_load; + long expirationTime = 360L; // default is 6 hours + if (request.getFn().isSetExpirationTime()) { + expirationTime = request.getFn().getExpirationTime(); + } + objCache = getClassCache(jarPath, request.getFn().getSignature(), expirationTime, + funcRetType, parameterTypes); + Constructor ctor = objCache.udfClass.getConstructor(); + udf = ctor.newInstance(); + } catch (MalformedURLException e) { + throw new UdfRuntimeException("Unable to load jar.", e); + } catch (SecurityException e) { + throw new UdfRuntimeException("Unable to load function.", e); + } catch (ClassNotFoundException e) { + throw new UdfRuntimeException("Unable to find class.", e); + } catch (NoSuchMethodException e) { + throw new UdfRuntimeException( + "Unable to find constructor with no arguments.", e); + } catch (IllegalArgumentException e) { + throw new UdfRuntimeException( + "Unable to call UDF constructor with no arguments.", e); + } catch (Exception e) { + throw new UdfRuntimeException("Unable to call create UDF instance.", e); + } + } + + + public UdfClassCache getClassCache(String jarPath, String signature, long expirationTime, + Type funcRetType, Type... parameterTypes) + throws MalformedURLException, FileNotFoundException, ClassNotFoundException, InternalException, + UdfRuntimeException { + UdfClassCache cache = null; + if (isStaticLoad) { + cache = ScannerLoader.getUdfClassLoader(signature); + } + if (cache == null) { + ClassLoader loader; + if (Strings.isNullOrEmpty(jarPath)) { + // if jarPath is empty, which means the UDF jar is located in custom_lib + // and already be loaded when BE start. + // so here we use system class loader to load UDF class. + loader = ClassLoader.getSystemClassLoader(); + } else { + ClassLoader parent = getClass().getClassLoader(); + classLoader = UdfUtils.getClassLoader(jarPath, parent); + loader = classLoader; + } + cache = new UdfClassCache(); + cache.allMethods = new HashMap<>(); + cache.udfClass = Class.forName(className, true, loader); + cache.methodAccess = MethodAccess.get(cache.udfClass); + checkAndCacheUdfClass(cache, funcRetType, parameterTypes); + if (isStaticLoad) { + ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + } + } + return cache; + } + + protected abstract void checkAndCacheUdfClass(UdfClassCache cache, Type funcRetType, Type... parameterTypes) + throws InternalException, UdfRuntimeException; /** * Close the class loader we may have created. @@ -127,7 +188,7 @@ public void close() { // We are now un-usable (because the class loader has been // closed), so null out method_ and classLoader_. classLoader = null; - methodAccess = null; + objCache.methodAccess = null; } protected ColumnValueConverter getInputConverter(TPrimitiveType primitiveType, Class clz) { @@ -311,7 +372,8 @@ protected Map getInputConverters(int numColumns, for (int j = 0; j < numColumns; ++j) { // For UDAF, we need to offset by 1 since first arg is state int argIndex = isUdaf ? j + 1 : j; - ColumnValueConverter converter = getInputConverter(argTypes[j].getPrimitiveType(), argClass[argIndex]); + ColumnValueConverter converter = getInputConverter(objCache.argTypes[j].getPrimitiveType(), + objCache.argClass[argIndex]); if (converter != null) { converters.put(j, converter); } @@ -320,6 +382,6 @@ protected Map getInputConverters(int numColumns, } protected ColumnValueConverter getOutputConverter() { - return getOutputConverter(retType, retClass); + return getOutputConverter(objCache.retType, objCache.retClass); } } diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java index 53c214303ce1c3..53b3a060897bc2 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java @@ -19,14 +19,15 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.Pair; +import org.apache.doris.common.exception.InternalException; import org.apache.doris.common.exception.UdfRuntimeException; import org.apache.doris.common.jni.utils.JavaUdfDataType; import org.apache.doris.common.jni.utils.OffHeap; +import org.apache.doris.common.jni.utils.UdfClassCache; import org.apache.doris.common.jni.utils.UdfUtils; import org.apache.doris.common.jni.vec.VectorTable; import org.apache.doris.thrift.TJavaUdfExecutorCtorParams; -import com.esotericsoftware.reflectasm.MethodAccess; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import org.apache.log4j.Logger; @@ -36,10 +37,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.lang.reflect.Array; -import java.lang.reflect.Constructor; -import java.lang.reflect.Method; import java.net.MalformedURLException; -import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -59,15 +57,15 @@ public class UdafExecutor extends BaseExecutor { private static final String UDAF_MERGE_FUNCTION = "merge"; private static final String UDAF_RESULT_FUNCTION = "getValue"; - private HashMap allMethods; + // private HashMap allMethods; private HashMap stateObjMap; - private int addIndex; /** * Constructor to create an object. */ public UdafExecutor(byte[] thriftParams) throws Exception { super(thriftParams); + className = fn.aggregate_fn.symbol; } /** @@ -76,7 +74,6 @@ public UdafExecutor(byte[] thriftParams) throws Exception { @Override public void close() { super.close(); - allMethods = null; stateObjMap = null; } @@ -99,7 +96,7 @@ public void addBatch(boolean isSinglePlace, int rowStart, int rowEnd, long place public void addBatchSingle(int rowStart, int rowEnd, long placeAddr, Object[][] inputs) throws UdfRuntimeException { Long curPlace = placeAddr; - Object[] inputArgs = new Object[argTypes.length + 1]; + Object[] inputArgs = new Object[objCache.argTypes.length + 1]; Object state = stateObjMap.get(curPlace); if (state != null) { inputArgs[0] = state; @@ -114,7 +111,7 @@ public void addBatchSingle(int rowStart, int rowEnd, long placeAddr, Object[][] for (int j = 0; j < numColumns; ++j) { inputArgs[j + 1] = inputs[j][i]; } - methodAccess.invoke(udf, addIndex, inputArgs); + objCache.methodAccess.invoke(udf, objCache.methodIndex, inputArgs); } } @@ -134,15 +131,15 @@ public void addBatchPlaces(int rowStart, int rowEnd, long placeAddr, int offset, placeState[row - rowStart] = newState; } } - //spilt into two for loop + // spilt into two for loop - Object[] inputArgs = new Object[argTypes.length + 1]; + Object[] inputArgs = new Object[objCache.argTypes.length + 1]; for (int row = 0; row < numRows; ++row) { inputArgs[0] = placeState[row]; for (int j = 0; j < numColumns; ++j) { inputArgs[j + 1] = inputs[j][row]; } - methodAccess.invoke(udf, addIndex, inputArgs); + objCache.methodAccess.invoke(udf, objCache.methodIndex, inputArgs); } } @@ -151,7 +148,7 @@ public void addBatchPlaces(int rowStart, int rowEnd, long placeAddr, int offset, */ public Object createAggState() throws UdfRuntimeException { try { - return allMethods.get(UDAF_CREATE_FUNCTION).invoke(udf, null); + return objCache.allMethods.get(UDAF_CREATE_FUNCTION).invoke(udf, null); } catch (Exception e) { LOG.warn("invoke createAggState function meet some error: ", e); throw new UdfRuntimeException("UDAF failed to create: ", e); @@ -164,7 +161,7 @@ public Object createAggState() throws UdfRuntimeException { public void destroy() throws UdfRuntimeException { try { for (Object obj : stateObjMap.values()) { - allMethods.get(UDAF_DESTROY_FUNCTION).invoke(udf, obj); + objCache.allMethods.get(UDAF_DESTROY_FUNCTION).invoke(udf, obj); } stateObjMap.clear(); } catch (Exception e) { @@ -182,7 +179,7 @@ public byte[] serialize(long place) throws UdfRuntimeException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); args[0] = stateObjMap.get(place); args[1] = new DataOutputStream(baos); - allMethods.get(UDAF_SERIALIZE_FUNCTION).invoke(udf, args); + objCache.allMethods.get(UDAF_SERIALIZE_FUNCTION).invoke(udf, args); return baos.toByteArray(); } catch (Exception e) { LOG.info("evaluate exception debug: " + debugString()); @@ -201,7 +198,7 @@ public void reset(long place) throws UdfRuntimeException { if (args[0] == null) { return; } - allMethods.get(UDAF_RESET_FUNCTION).invoke(udf, args); + objCache.allMethods.get(UDAF_RESET_FUNCTION).invoke(udf, args); } catch (Exception e) { LOG.info("evaluate exception debug: " + debugString()); LOG.warn("invoke reset function meet some error: ", e); @@ -219,7 +216,7 @@ public void merge(long place, byte[] data) throws UdfRuntimeException { ByteArrayInputStream bins = new ByteArrayInputStream(data); args[0] = createAggState(); args[1] = new DataInputStream(bins); - allMethods.get(UDAF_DESERIALIZE_FUNCTION).invoke(udf, args); + objCache.allMethods.get(UDAF_DESERIALIZE_FUNCTION).invoke(udf, args); args[1] = args[0]; Long curPlace = place; Object state = stateObjMap.get(curPlace); @@ -230,7 +227,7 @@ public void merge(long place, byte[] data) throws UdfRuntimeException { stateObjMap.put(curPlace, newState); args[0] = newState; } - allMethods.get(UDAF_MERGE_FUNCTION).invoke(udf, args); + objCache.allMethods.get(UDAF_MERGE_FUNCTION).invoke(udf, args); } catch (Exception e) { LOG.info("evaluate exception debug: " + debugString()); LOG.warn("invoke merge function meet some error: ", e); @@ -250,12 +247,12 @@ public long getValue(long place, Map outputParams) throws UdfRun if (stateObjMap.get(place) == null) { stateObjMap.put(place, createAggState()); } - Object value = allMethods.get(UDAF_RESULT_FUNCTION).invoke(udf, stateObjMap.get(place)); + Object value = objCache.allMethods.get(UDAF_RESULT_FUNCTION).invoke(udf, stateObjMap.get(place)); // If the return type is primitive, we can't cast the array of primitive type as array of Object, // so we have to new its wrapped Object. Object[] result = outputTable.getColumnType(0).isPrimitive() ? outputTable.getColumn(0).newObjectContainerArray(1) - : (Object[]) Array.newInstance(retClass, 1); + : (Object[]) Array.newInstance(objCache.retClass, 1); result[0] = value; boolean isNullable = Boolean.parseBoolean(outputParams.getOrDefault("is_nullable", "true")); outputTable.appendData(0, result, getOutputConverter(), isNullable); @@ -267,109 +264,85 @@ public long getValue(long place, Map outputParams) throws UdfRun } } - @Override protected void init(TJavaUdfExecutorCtorParams request, String jarPath, Type funcRetType, Type... parameterTypes) throws UdfRuntimeException { - String className = request.fn.aggregate_fn.symbol; - allMethods = new HashMap<>(); + super.init(request, jarPath, funcRetType, parameterTypes); stateObjMap = new HashMap<>(); + } + + @Override + protected void checkAndCacheUdfClass(UdfClassCache cache, Type funcRetType, + Type... parameterTypes) throws InternalException, UdfRuntimeException { ArrayList signatures = Lists.newArrayList(); - try { - ClassLoader loader; - if (jarPath != null) { - ClassLoader parent = getClass().getClassLoader(); - classLoader = UdfUtils.getClassLoader(jarPath, parent); - loader = classLoader; - } else { - // for test - loader = ClassLoader.getSystemClassLoader(); - } - Class c = Class.forName(className, true, loader); - methodAccess = MethodAccess.get(c); - Constructor ctor = c.getConstructor(); - udf = ctor.newInstance(); - Method[] methods = c.getDeclaredMethods(); - int idx = 0; - for (idx = 0; idx < methods.length; ++idx) { - signatures.add(methods[idx].toGenericString()); - switch (methods[idx].getName()) { - case UDAF_DESTROY_FUNCTION: - case UDAF_CREATE_FUNCTION: - case UDAF_MERGE_FUNCTION: - case UDAF_SERIALIZE_FUNCTION: - case UDAF_RESET_FUNCTION: - case UDAF_DESERIALIZE_FUNCTION: { - allMethods.put(methods[idx].getName(), methods[idx]); - break; + Class c = cache.udfClass; + Method[] methods = c.getMethods(); + int idx = 0; + for (idx = 0; idx < methods.length; ++idx) { + signatures.add(methods[idx].toGenericString()); + switch (methods[idx].getName()) { + case UDAF_DESTROY_FUNCTION: + case UDAF_CREATE_FUNCTION: + case UDAF_MERGE_FUNCTION: + case UDAF_SERIALIZE_FUNCTION: + case UDAF_RESET_FUNCTION: + case UDAF_DESERIALIZE_FUNCTION: { + cache.allMethods.put(methods[idx].getName(), methods[idx]); + break; + } + case UDAF_RESULT_FUNCTION: { + cache.allMethods.put(methods[idx].getName(), methods[idx]); + Pair returnType = UdfUtils.setReturnType(funcRetType, + methods[idx].getReturnType()); + if (!returnType.first) { + if (LOG.isDebugEnabled()) { + LOG.debug("result function set return parameterTypes has error"); + } + } else { + cache.retType = returnType.second; + cache.retClass = methods[idx].getReturnType(); } - case UDAF_RESULT_FUNCTION: { - allMethods.put(methods[idx].getName(), methods[idx]); - Pair returnType = UdfUtils.setReturnType(funcRetType, - methods[idx].getReturnType()); - if (!returnType.first) { - if (LOG.isDebugEnabled()) { - LOG.debug("result function set return parameterTypes has error"); - } - } else { - retType = returnType.second; - retClass = methods[idx].getReturnType(); + break; + } + case UDAF_ADD_FUNCTION: { + cache.allMethods.put(methods[idx].getName(), methods[idx]); + cache.methodIndex = cache.methodAccess.getIndex(UDAF_ADD_FUNCTION); + cache.argClass = methods[idx].getParameterTypes(); + if (cache.argClass.length != parameterTypes.length + 1) { + if (LOG.isDebugEnabled()) { + LOG.debug("add function parameterTypes length not equal " + cache.argClass.length + " " + + parameterTypes.length + " " + methods[idx].getName()); } - break; } - case UDAF_ADD_FUNCTION: { - allMethods.put(methods[idx].getName(), methods[idx]); - addIndex = methodAccess.getIndex(UDAF_ADD_FUNCTION); - argClass = methods[idx].getParameterTypes(); - if (argClass.length != parameterTypes.length + 1) { + if (!(parameterTypes.length == 0)) { + Pair inputType = UdfUtils.setArgTypes(parameterTypes, + cache.argClass, true); + if (!inputType.first) { if (LOG.isDebugEnabled()) { - LOG.debug("add function parameterTypes length not equal " + argClass.length + " " - + parameterTypes.length + " " + methods[idx].getName()); - } - } - if (!(parameterTypes.length == 0)) { - Pair inputType = UdfUtils.setArgTypes(parameterTypes, - argClass, true); - if (!inputType.first) { - if (LOG.isDebugEnabled()) { - LOG.debug("add function set arg parameterTypes has error"); - } - } else { - argTypes = inputType.second; + LOG.debug("add function set arg parameterTypes has error"); } } else { - // Special case where the UDF doesn't take any input args - argTypes = new JavaUdfDataType[0]; + cache.argTypes = inputType.second; } - break; + } else { + // Special case where the UDF doesn't take any input args + cache.argTypes = new JavaUdfDataType[0]; } - default: - break; + break; } + default: + break; } - if (idx == methods.length) { - return; - } - StringBuilder sb = new StringBuilder(); - sb.append("Unable to find evaluate function with the correct signature: ") - .append(className) - .append(".evaluate(") - .append(Joiner.on(", ").join(parameterTypes)).append(")\n").append("UDF contains: \n ") - .append(Joiner.on("\n ").join(signatures)); - throw new UdfRuntimeException(sb.toString()); - - } catch (MalformedURLException e) { - throw new UdfRuntimeException("Unable to load jar.", e); - } catch (SecurityException e) { - throw new UdfRuntimeException("Unable to load function.", e); - } catch (ClassNotFoundException e) { - throw new UdfRuntimeException("Unable to find class.", e); - } catch (NoSuchMethodException e) { - throw new UdfRuntimeException("Unable to find constructor with no arguments.", e); - } catch (IllegalArgumentException e) { - throw new UdfRuntimeException("Unable to call UDAF constructor with no arguments.", e); - } catch (Exception e) { - throw new UdfRuntimeException("Unable to call create UDAF instance.", e); } + if (idx == methods.length) { + return; + } + StringBuilder sb = new StringBuilder(); + sb.append("Unable to find evaluate function with the correct signature: ") + .append(className) + .append(".evaluate(") + .append(Joiner.on(", ").join(parameterTypes)).append(")\n").append("UDF contains: \n ") + .append(Joiner.on("\n ").join(signatures)); + throw new UdfRuntimeException(sb.toString()); } } diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java index e24fc719ff553d..0e9d181e606166 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java @@ -19,7 +19,6 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.Pair; -import org.apache.doris.common.classloader.ScannerLoader; import org.apache.doris.common.exception.InternalException; import org.apache.doris.common.exception.UdfRuntimeException; import org.apache.doris.common.jni.utils.JavaUdfDataType; @@ -28,17 +27,13 @@ import org.apache.doris.common.jni.vec.VectorTable; import org.apache.doris.thrift.TJavaUdfExecutorCtorParams; -import com.esotericsoftware.reflectasm.MethodAccess; import com.google.common.base.Joiner; -import com.google.common.base.Strings; import com.google.common.collect.Lists; import org.apache.log4j.Logger; -import java.io.FileNotFoundException; import java.lang.reflect.Array; -import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.MalformedURLException; import java.util.ArrayList; import java.util.Map; @@ -47,19 +42,13 @@ public class UdfExecutor extends BaseExecutor { private static final String UDF_PREPARE_FUNCTION_NAME = "prepare"; private static final String UDF_FUNCTION_NAME = "evaluate"; - // setup by init() and cleared by close() - private Method method; - - private int evaluateIndex; - - private boolean isStaticLoad = false; - /** * Create a UdfExecutor, using parameters from a serialized thrift object. Used by * the backend. */ public UdfExecutor(byte[] thriftParams) throws Exception { super(thriftParams); + className = fn.scalar_fn.symbol; } /** @@ -69,7 +58,7 @@ public UdfExecutor(byte[] thriftParams) throws Exception { public void close() { // We are now un-usable (because the class loader has been // closed), so null out method_ and classLoader_. - method = null; + // method = null; if (!isStaticLoad) { super.close(); } else if (outputTable != null) { @@ -91,7 +80,7 @@ public long evaluate(Map inputParams, Map output // so we have to new its wrapped Object. Object[] result = outputTable.getColumnType(0).isPrimitive() ? outputTable.getColumn(0).newObjectContainerArray(numRows) - : (Object[]) Array.newInstance(method.getReturnType(), numRows); + : (Object[]) Array.newInstance(objCache.retClass, numRows); Object[][] inputs = inputTable.getMaterializedData(getInputConverters(numColumns, false)); Object[] parameters = new Object[numColumns]; for (int i = 0; i < numRows; ++i) { @@ -99,7 +88,7 @@ public long evaluate(Map inputParams, Map output int row = inputTable.isConstColumn(j) ? 0 : i; parameters[j] = inputs[j][row]; } - result[i] = methodAccess.invoke(udf, evaluateIndex, parameters); + result[i] = objCache.methodAccess.invoke(udf, objCache.methodIndex, parameters); } boolean isNullable = Boolean.parseBoolean(outputParams.getOrDefault("is_nullable", "true")); outputTable.appendData(0, result, getOutputConverter(), isNullable); @@ -110,10 +99,6 @@ public long evaluate(Map inputParams, Map output } } - public Method getMethod() { - return method; - } - private Method findPrepareMethod(Method[] methods) { for (Method method : methods) { if (method.getName().equals(UDF_PREPARE_FUNCTION_NAME) && method.getReturnType().equals(void.class) @@ -124,45 +109,31 @@ private Method findPrepareMethod(Method[] methods) { return null; // Method not found } - public UdfClassCache getClassCache(String className, String jarPath, String signature, long expirationTime, - Type funcRetType, Type... parameterTypes) - throws MalformedURLException, FileNotFoundException, ClassNotFoundException, InternalException, - UdfRuntimeException { - UdfClassCache cache = null; - if (isStaticLoad) { - cache = ScannerLoader.getUdfClassLoader(signature); - } - if (cache == null) { - ClassLoader loader; - if (Strings.isNullOrEmpty(jarPath)) { - // if jarPath is empty, which means the UDF jar is located in custom_lib - // and already be loaded when BE start. - // so here we use system class loader to load UDF class. - loader = ClassLoader.getSystemClassLoader(); - } else { - ClassLoader parent = getClass().getClassLoader(); - classLoader = UdfUtils.getClassLoader(jarPath, parent); - loader = classLoader; - } - cache = new UdfClassCache(); - cache.udfClass = Class.forName(className, true, loader); - cache.methodAccess = MethodAccess.get(cache.udfClass); - checkAndCacheUdfClass(className, cache, funcRetType, parameterTypes); - if (isStaticLoad) { - ScannerLoader.cacheClassLoader(signature, cache, expirationTime); + // Preallocate the input objects that will be passed to the underlying UDF. + // These objects are allocated once and reused across calls to evaluate() + @Override + protected void init(TJavaUdfExecutorCtorParams request, String jarPath, Type funcRetType, + Type... parameterTypes) throws UdfRuntimeException { + super.init(request, jarPath, funcRetType, parameterTypes); + Method prepareMethod = objCache.allMethods.get(UDF_PREPARE_FUNCTION_NAME); + if (prepareMethod != null) { + try { + prepareMethod.invoke(udf); + } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + throw new UdfRuntimeException("Unable to call UDF prepare function.", e); } } - return cache; } - private void checkAndCacheUdfClass(String className, UdfClassCache cache, Type funcRetType, Type... parameterTypes) + @Override + protected void checkAndCacheUdfClass(UdfClassCache cache, Type funcRetType, Type... parameterTypes) throws InternalException, UdfRuntimeException { ArrayList signatures = Lists.newArrayList(); Class c = cache.udfClass; Method[] methods = c.getMethods(); Method prepareMethod = findPrepareMethod(methods); if (prepareMethod != null) { - cache.prepareMethod = prepareMethod; + cache.allMethods.put(UDF_PREPARE_FUNCTION_NAME, prepareMethod); } for (Method m : methods) { // By convention, the udf must contain the function "evaluate" @@ -176,8 +147,8 @@ private void checkAndCacheUdfClass(String className, UdfClassCache cache, Type f if (cache.argClass.length != parameterTypes.length) { continue; } - cache.method = m; - cache.evaluateIndex = cache.methodAccess.getIndex(UDF_FUNCTION_NAME, cache.argClass); + cache.allMethods.put(UDF_FUNCTION_NAME, m); + cache.methodIndex = cache.methodAccess.getIndex(UDF_FUNCTION_NAME, cache.argClass); Pair returnType; if (cache.argClass.length == 0 && parameterTypes.length == 0) { // Special case where the UDF doesn't take any input args @@ -202,69 +173,19 @@ private void checkAndCacheUdfClass(String className, UdfClassCache cache, Type f } else { cache.argTypes = inputType.second; } - if (cache.method != null) { - cache.retClass = cache.method.getReturnType(); - } + cache.retClass = m.getReturnType(); return; } StringBuilder sb = new StringBuilder(); sb.append("Unable to find evaluate function with the correct signature: ") - .append(className) - .append(".evaluate(") - .append(Joiner.on(", ").join(parameterTypes)) - .append(")\n") - .append("UDF contains: \n ") - .append(Joiner.on("\n ").join(signatures)); + .append(className) + .append(".evaluate(") + .append(Joiner.on(", ").join(parameterTypes)) + .append(")\n") + .append("UDF contains: \n ") + .append(Joiner.on("\n ").join(signatures)); throw new UdfRuntimeException(sb.toString()); } - - // Preallocate the input objects that will be passed to the underlying UDF. - // These objects are allocated once and reused across calls to evaluate() - @Override - protected void init(TJavaUdfExecutorCtorParams request, String jarPath, Type funcRetType, - Type... parameterTypes) throws UdfRuntimeException { - String className = request.fn.scalar_fn.symbol; - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Loading UDF '" + className + "' from " + jarPath); - } - isStaticLoad = request.getFn().isSetIsStaticLoad() && request.getFn().is_static_load; - long expirationTime = 360L; // default is 6 hours - if (request.getFn().isSetExpirationTime()) { - expirationTime = request.getFn().getExpirationTime(); - } - UdfClassCache cache = getClassCache(className, jarPath, request.getFn().getSignature(), expirationTime, - funcRetType, parameterTypes); - methodAccess = cache.methodAccess; - Constructor ctor = cache.udfClass.getConstructor(); - udf = ctor.newInstance(); - Method prepareMethod = cache.prepareMethod; - if (prepareMethod != null) { - prepareMethod.invoke(udf); - } - - argClass = cache.argClass; - method = cache.method; - evaluateIndex = cache.evaluateIndex; - retType = cache.retType; - argTypes = cache.argTypes; - retClass = cache.retClass; - } catch (MalformedURLException e) { - throw new UdfRuntimeException("Unable to load jar.", e); - } catch (SecurityException e) { - throw new UdfRuntimeException("Unable to load function.", e); - } catch (ClassNotFoundException e) { - throw new UdfRuntimeException("Unable to find class.", e); - } catch (NoSuchMethodException e) { - throw new UdfRuntimeException( - "Unable to find constructor with no arguments.", e); - } catch (IllegalArgumentException e) { - throw new UdfRuntimeException( - "Unable to call UDF constructor with no arguments.", e); - } catch (Exception e) { - throw new UdfRuntimeException("Unable to call create UDF instance.", e); - } - } } From 7770b3b4cc8fc59e391b4f6cae86f25e2e6c0317 Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Sat, 8 Feb 2025 15:44:28 +0800 Subject: [PATCH 2/5] update --- .../src/main/java/org/apache/doris/udf/BaseExecutor.java | 1 - .../src/main/java/org/apache/doris/udf/UdafExecutor.java | 7 +++---- .../src/main/java/org/apache/doris/udf/UdfExecutor.java | 3 +-- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java index 0d2160642e7a21..3bc138b523c4c4 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java @@ -33,7 +33,6 @@ import com.esotericsoftware.reflectasm.MethodAccess; import com.google.common.base.Strings; - import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java index 53b3a060897bc2..c6aa2ece0576be 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java @@ -37,7 +37,8 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.lang.reflect.Array; -import java.net.MalformedURLException; +import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.HashMap; import java.util.Map; @@ -57,7 +58,6 @@ public class UdafExecutor extends BaseExecutor { private static final String UDAF_MERGE_FUNCTION = "merge"; private static final String UDAF_RESULT_FUNCTION = "getValue"; - // private HashMap allMethods; private HashMap stateObjMap; /** @@ -65,7 +65,6 @@ public class UdafExecutor extends BaseExecutor { */ public UdafExecutor(byte[] thriftParams) throws Exception { super(thriftParams); - className = fn.aggregate_fn.symbol; } /** @@ -266,9 +265,9 @@ public long getValue(long place, Map outputParams) throws UdfRun protected void init(TJavaUdfExecutorCtorParams request, String jarPath, Type funcRetType, Type... parameterTypes) throws UdfRuntimeException { + className = fn.aggregate_fn.symbol; super.init(request, jarPath, funcRetType, parameterTypes); stateObjMap = new HashMap<>(); - } @Override diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java index 0e9d181e606166..32bbad68a27fc5 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java @@ -48,7 +48,6 @@ public class UdfExecutor extends BaseExecutor { */ public UdfExecutor(byte[] thriftParams) throws Exception { super(thriftParams); - className = fn.scalar_fn.symbol; } /** @@ -58,7 +57,6 @@ public UdfExecutor(byte[] thriftParams) throws Exception { public void close() { // We are now un-usable (because the class loader has been // closed), so null out method_ and classLoader_. - // method = null; if (!isStaticLoad) { super.close(); } else if (outputTable != null) { @@ -114,6 +112,7 @@ private Method findPrepareMethod(Method[] methods) { @Override protected void init(TJavaUdfExecutorCtorParams request, String jarPath, Type funcRetType, Type... parameterTypes) throws UdfRuntimeException { + className = fn.scalar_fn.symbol; super.init(request, jarPath, funcRetType, parameterTypes); Method prepareMethod = objCache.allMethods.get(UDF_PREPARE_FUNCTION_NAME); if (prepareMethod != null) { From 57845036c3ccb9124941c35671329916e23c3859 Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Mon, 10 Feb 2025 15:05:09 +0800 Subject: [PATCH 3/5] update --- .../src/main/java/org/apache/doris/udf/UdfExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java index 32bbad68a27fc5..a7f8f505967587 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java @@ -149,6 +149,7 @@ protected void checkAndCacheUdfClass(UdfClassCache cache, Type funcRetType, Type cache.allMethods.put(UDF_FUNCTION_NAME, m); cache.methodIndex = cache.methodAccess.getIndex(UDF_FUNCTION_NAME, cache.argClass); Pair returnType; + cache.retClass = m.getReturnType(); if (cache.argClass.length == 0 && parameterTypes.length == 0) { // Special case where the UDF doesn't take any input args returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); @@ -172,7 +173,6 @@ protected void checkAndCacheUdfClass(UdfClassCache cache, Type funcRetType, Type } else { cache.argTypes = inputType.second; } - cache.retClass = m.getReturnType(); return; } StringBuilder sb = new StringBuilder(); From 7a2cc5fe35d3a4041f6f9268cd3b30b8e1b07fdb Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Mon, 10 Feb 2025 15:12:03 +0800 Subject: [PATCH 4/5] update --- regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy b/regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy index a8fa7e347af36e..0f16e10bef1ce0 100644 --- a/regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy +++ b/regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy @@ -62,7 +62,7 @@ suite("test_javaudtf_decimal") { qt_select2 """ SELECT user_id, cost_2, e1 FROM ${tableName} lateral view udtf_decimal(cost_2) temp as e1 order by user_id; """ } finally { - try_sql("DROP FUNCTION IF EXISTS udtf_decimal(decimal);") + try_sql("DROP FUNCTION IF EXISTS udtf_decimal(decimal(27,9));") try_sql("DROP TABLE IF EXISTS ${tableName}") } } From 88fb1d5d998b8f428ee699d30087151ae9df158c Mon Sep 17 00:00:00 2001 From: zhangstar333 Date: Tue, 11 Feb 2025 11:42:08 +0800 Subject: [PATCH 5/5] add test case --- .../doris/common/jni/utils/ExpiringMap.java | 4 ++ .../org/apache/doris/udf/UdafExecutor.java | 4 +- .../doris/analysis/CreateFunctionStmt.java | 4 ++ .../expressions/functions/udf/JavaUdaf.java | 12 +++- .../expressions/functions/udf/JavaUdtf.java | 13 +++- .../test_javaudf_static_load_test.out | 40 ++++++++++++ .../apache/doris/udf/StaticIntTestUDAF.java | 65 +++++++++++++++++++ .../apache/doris/udf/StaticIntTestUDTF.java | 35 ++++++++++ .../test_javaudf_static_load_test.groovy | 40 ++++++++++++ 9 files changed, 212 insertions(+), 5 deletions(-) create mode 100644 regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java create mode 100644 regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java index f08b50a0c42f38..7bb4b61344a91b 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java @@ -78,6 +78,10 @@ public void remove(K key) { ttlMap.remove(key); } + public int size() { + return map.size(); + } + public void shutdown() { scheduler.shutdown(); try { diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java index c6aa2ece0576be..3f3860ad53dce0 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java @@ -72,7 +72,9 @@ public UdafExecutor(byte[] thriftParams) throws Exception { */ @Override public void close() { - super.close(); + if (!isStaticLoad) { + super.close(); + } stateObjMap = null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java index 4a38d06ffe28ff..a0c74c8bd3f4a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java @@ -360,6 +360,8 @@ private void analyzeTableFunction() throws AnalysisException { location, symbol, null, null); function.setChecksum(checksum); function.setNullableMode(returnNullMode); + function.setStaticLoad(isStaticLoad); + function.setExpirationTime(expirationTime); function.setUDTFunction(true); // Todo: maybe in create tables function, need register two function, one is // normal and one is outer as those have different result when result is NULL. @@ -426,6 +428,8 @@ private void analyzeUda() throws AnalysisException { function.setBinaryType(binaryType); function.setChecksum(checksum); function.setNullableMode(returnNullMode); + function.setStaticLoad(isStaticLoad); + function.setExpirationTime(expirationTime); } private void analyzeUdf() throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java index 27cf22b73b28dc..85143d2aca65bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java @@ -62,6 +62,8 @@ public class JavaUdaf extends AggregateFunction implements ExplicitlyCastableSig private final String getValueFn; private final String removeFn; private final String checkSum; + private final boolean isStaticLoad; + private final long expirationTime; /** * Constructor of UDAF @@ -72,7 +74,7 @@ public JavaUdaf(String name, long functionId, String dbName, TFunctionBinaryType String objectFile, String symbol, String initFn, String updateFn, String mergeFn, String serializeFn, String finalizeFn, String getValueFn, String removeFn, - boolean isDistinct, String checkSum, Expression... args) { + boolean isDistinct, String checkSum, boolean isStaticLoad, long expirationTime, Expression... args) { super(name, isDistinct, args); this.dbName = dbName; this.functionId = functionId; @@ -90,6 +92,8 @@ public JavaUdaf(String name, long functionId, String dbName, TFunctionBinaryType this.getValueFn = getValueFn; this.removeFn = removeFn; this.checkSum = checkSum; + this.isStaticLoad = isStaticLoad; + this.expirationTime = expirationTime; } @Override @@ -120,7 +124,7 @@ public JavaUdaf withDistinctAndChildren(boolean isDistinct, List chi Preconditions.checkArgument(children.size() == this.children.size()); return new JavaUdaf(getName(), functionId, dbName, binaryType, signature, intermediateType, nullableMode, objectFile, symbol, initFn, updateFn, mergeFn, serializeFn, finalizeFn, getValueFn, removeFn, - isDistinct, checkSum, children.toArray(new Expression[0])); + isDistinct, checkSum, isStaticLoad, expirationTime, children.toArray(new Expression[0])); } /** @@ -162,6 +166,8 @@ public static void translateToNereidsFunction(String dbName, org.apache.doris.ca aggregate.getRemoveFnSymbol(), false, aggregate.getChecksum(), + aggregate.isStaticLoad(), + aggregate.getExpirationTime(), virtualSlots); JavaUdafBuilder builder = new JavaUdafBuilder(udaf); @@ -196,6 +202,8 @@ public Function getCatalogFunction() { expr.setNullableMode(nullableMode); expr.setChecksum(checkSum); expr.setId(functionId); + expr.setStaticLoad(isStaticLoad); + expr.setExpirationTime(expirationTime); return expr; } catch (Exception e) { throw new AnalysisException(e.getMessage(), e.getCause()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java index 96ea02bf2b7215..c90a8c343a3c75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java @@ -56,6 +56,8 @@ public class JavaUdtf extends TableGeneratingFunction implements ExplicitlyCasta private final String prepareFn; private final String closeFn; private final String checkSum; + private final boolean isStaticLoad; + private final long expirationTime; /** * Constructor of UDTF @@ -63,7 +65,7 @@ public class JavaUdtf extends TableGeneratingFunction implements ExplicitlyCasta public JavaUdtf(String name, long functionId, String dbName, TFunctionBinaryType binaryType, FunctionSignature signature, NullableMode nullableMode, String objectFile, String symbol, String prepareFn, String closeFn, - String checkSum, Expression... args) { + String checkSum, boolean isStaticLoad, long expirationTime, Expression... args) { super(name, args); this.dbName = dbName; this.functionId = functionId; @@ -75,6 +77,8 @@ public JavaUdtf(String name, long functionId, String dbName, TFunctionBinaryType this.prepareFn = prepareFn; this.closeFn = closeFn; this.checkSum = checkSum; + this.isStaticLoad = isStaticLoad; + this.expirationTime = expirationTime; } /** @@ -84,7 +88,8 @@ public JavaUdtf(String name, long functionId, String dbName, TFunctionBinaryType public JavaUdtf withChildren(List children) { Preconditions.checkArgument(children.size() == this.children.size()); return new JavaUdtf(getName(), functionId, dbName, binaryType, signature, nullableMode, - objectFile, symbol, prepareFn, closeFn, checkSum, children.toArray(new Expression[0])); + objectFile, symbol, prepareFn, closeFn, checkSum, isStaticLoad, expirationTime, + children.toArray(new Expression[0])); } @Override @@ -119,6 +124,8 @@ public Function getCatalogFunction() { expr.setNullableMode(nullableMode); expr.setChecksum(checkSum); expr.setId(functionId); + expr.setStaticLoad(isStaticLoad); + expr.setExpirationTime(expirationTime); expr.setUDTFunction(true); return expr; } catch (Exception e) { @@ -153,6 +160,8 @@ public static void translateToNereidsFunction(String dbName, org.apache.doris.ca scalar.getPrepareFnSymbol(), scalar.getCloseFnSymbol(), scalar.getChecksum(), + scalar.isStaticLoad(), + scalar.getExpirationTime(), virtualSlots); JavaUdtfBuilder builder = new JavaUdtfBuilder(udf); diff --git a/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out index 9f57f52d091d3e..097f84fd7a07e4 100644 --- a/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out +++ b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out @@ -25,3 +25,43 @@ -- !select5 -- 5 +-- !select6 -- +1 + +-- !select7 -- +2 + +-- !select8 -- +3 + +-- !select9 -- +4 + +-- !select10 -- +5 + +-- !select11 -- +1 1 + +-- !select12 -- +1 2 +1 2 + +-- !select13 -- +1 3 +1 3 +1 3 + +-- !select14 -- +1 4 +1 4 +1 4 +1 4 + +-- !select15 -- +1 5 +1 5 +1 5 +1 5 +1 5 + diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java new file mode 100644 index 00000000000000..dde6c1336d1c78 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public class StaticIntTestUDAF { + static { + System.out.println("static load should only print once StaticIntTestUDAF"); + } + private static int value = 0; + public static class State { + public long counter = 0; + } + + public State create() { + return new State(); + } + + public void destroy(State state) { + } + + public void reset(State state) { + state.counter = 0; + } + + public void add(State state, Integer val) { + if (val == null) return; + state.counter += val; + } + + public void serialize(State state, DataOutputStream out) throws IOException { + out.writeLong(state.counter); + } + + public void deserialize(State state, DataInputStream in) throws IOException { + state.counter = in.readLong(); + } + + public void merge(State state, State rhs) { + state.counter += rhs.counter; + } + + public long getValue(State state) { + value = value + 1; + System.out.println("getValue StaticIntTestUDAF " + value + " " + state.counter); + return state.counter + value; + } +} diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java new file mode 100644 index 00000000000000..460a4ac809d4d2 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +import java.util.ArrayList; + +public class StaticIntTestUDTF { + static { + System.out.println("static load should only print once StaticIntTestUDTF"); + } + private static int value = 0; + public ArrayList evaluate() { + ArrayList result = new ArrayList<>(); + value = value + 1; + for (int i = 0; i < value; i++) { + result.add(value); + } + return result; + } +} diff --git a/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy index c816ec90292a79..1a98767fb51cdf 100644 --- a/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy +++ b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy @@ -66,6 +66,7 @@ suite("test_javaudf_static_load_test") { "type"="JAVA_UDF" ); """ + // the result of the following queries should be the accumulation sql """set parallel_pipeline_task_num = 1; """ qt_select1 """ SELECT static_load_test(); """ qt_select2 """ SELECT static_load_test(); """ @@ -73,8 +74,47 @@ suite("test_javaudf_static_load_test") { qt_select4 """ SELECT static_load_test(); """ qt_select5 """ SELECT static_load_test(); """ + + sql """ CREATE AGGREGATE FUNCTION static_load_test_udaf(int) RETURNS BigInt PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.StaticIntTestUDAF", + "always_nullable"="true", + "static_load"="true", + "expiration_time"="10", + "type"="JAVA_UDF" + ); """ + + // the result of the following queries should be the accumulation + // maybe we need drop funtion and test again, the result should be the same + // but the regression test will copy the jar to be custom_lib, and loaded by BE when it's started + // so it's can't be unloaded + sql """set parallel_pipeline_task_num = 1; """ + qt_select6 """ SELECT static_load_test_udaf(0); """ + qt_select7 """ SELECT static_load_test_udaf(0); """ + qt_select8 """ SELECT static_load_test_udaf(0); """ + qt_select9 """ SELECT static_load_test_udaf(0); """ + qt_select10 """ SELECT static_load_test_udaf(0); """ + + sql """ CREATE TABLES FUNCTION static_load_test_udtf() RETURNS array PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.StaticIntTestUDTF", + "always_nullable"="true", + "static_load"="true", + "expiration_time"="10", + "type"="JAVA_UDF" + ); """ + + sql """set parallel_pipeline_task_num = 1; """ + qt_select11 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + qt_select12 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + qt_select13 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + qt_select14 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + qt_select15 """ select k1, e1 from (select 1 k1) as t lateral view static_load_test_udtf() tmp1 as e1; """ + } finally { try_sql("DROP FUNCTION IF EXISTS static_load_test();") + try_sql("DROP FUNCTION IF EXISTS static_load_test_udaf(int);") + try_sql("DROP FUNCTION IF EXISTS static_load_test_udtf();") try_sql("DROP TABLE IF EXISTS ${tableName}") } }