From 34a32311d769af4af3a5b326c428172d6422b6c6 Mon Sep 17 00:00:00 2001 From: catalinii Date: Wed, 9 Dec 2020 14:01:51 -0800 Subject: [PATCH] Log first successful container request (#20) Call internal lyft API using reflection when the executor was successfully requested from K8s --- .../org/apache/spark/util/LyftUtils.scala | 31 ++++++++++++++ .../apache/spark/util/LyftUtilsSuite.scala | 40 +++++++++++++++++++ .../cluster/k8s/ExecutorPodsAllocator.scala | 5 +++ 3 files changed, 76 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/util/LyftUtils.scala create mode 100644 core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/util/LyftUtils.scala b/core/src/main/scala/org/apache/spark/util/LyftUtils.scala new file mode 100644 index 0000000000000..d9c8a643ec57d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/LyftUtils.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +private[spark] object LyftUtils { + def callObjectMethodNoArguments(objectName: String, method: String): Boolean = { + var ok = true + try { + val m = Utils.classForName(objectName).getField("MODULE$").get(null) + Utils.classForName(objectName).getDeclaredMethod(method).invoke(m) + } catch { + case e: Throwable => ok = false + } + ok + } +} diff --git a/core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala new file mode 100644 index 0000000000000..7107a270ba742 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/LyftUtilsSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.internal.Logging + +object TestObjectLyftUtils { + var testVar = 0L + def setVal() = { + testVar = 1L + } +} + +class LyftUtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { + + test("callObjectMethodNoArguments") { + // Test calling the method using reflection 1 + val v = LyftUtils.callObjectMethodNoArguments("org.apache.spark.util.TestObjectLyftUtils$", "setVal") + assert(v === true) + assert(TestObjectLyftUtils.testVar === 1) + assert(false == + LyftUtils.callObjectMethodNoArguments("org.apache.spark.util.TestObjectLyftUtils$", "setVal1")) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index b394f35b15111..f948a88c869eb 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -222,6 +222,11 @@ private[spark] class ExecutorPodsAllocator( kubernetesClient.pods().create(podWithAttachedContainer) newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") + + + org.apache.spark.util.LyftUtils.callObjectMethodNoArguments( + "com.lyft.data.spark.AppMetrics$", + "setFirstExecutorAllocationTime") } }