From 647f71254a549e4694781d133d9cf34cbc548a12 Mon Sep 17 00:00:00 2001 From: zhao_wei_nan <326747337@qq.com> Date: Sun, 25 Feb 2024 20:50:53 +0800 Subject: [PATCH] feat: support specify more executor plugins, such as gluten. --- .../core/strategy/platform/SparkRuntime.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/streamingpro-core/src/main/java/streaming/core/strategy/platform/SparkRuntime.scala b/streamingpro-core/src/main/java/streaming/core/strategy/platform/SparkRuntime.scala index e5673d753..9af89fd75 100644 --- a/streamingpro-core/src/main/java/streaming/core/strategy/platform/SparkRuntime.scala +++ b/streamingpro-core/src/main/java/streaming/core/strategy/platform/SparkRuntime.scala @@ -110,10 +110,14 @@ class SparkRuntime(_params: JMap[Any, Any]) extends StreamingRuntime with Platfo // In spark3, The org.apache.spark.ExecutorPlugin interface and related configuration has been // replaced with org.apache.spark.api.plugin.SparkPlugin. MLSQLSparkConst.majorVersion(SparkCoreVersion.exactVersion) match { - case 2 => - conf.set("spark.executor.plugins", "org.apache.spark.ps.cluster.PSExecutorPlugin") - case _ => - conf.set("spark.plugins", "org.apache.spark.ps.cluster.PSExecutorPlugin") + case 2 => { + val plugins = conf.get("spark.executor.plugins", "").split(",").toSet + conf.set("spark.executor.plugins", (plugins + "org.apache.spark.ps.cluster.PSExecutorPlugin").mkString(",")) + } + case _ => { + val plugins = conf.get("spark.plugins", "").split(",").toSet + conf.set("spark.plugins", (plugins + "org.apache.spark.ps.cluster.PSExecutorPlugin").mkString(",")) + } }