diff --git a/scheduler/src/cook/compute_cluster.clj b/scheduler/src/cook/compute_cluster.clj index 90bd9fa4d3..e9b6bca91d 100644 --- a/scheduler/src/cook/compute_cluster.clj +++ b/scheduler/src/cook/compute_cluster.clj @@ -44,13 +44,21 @@ (restore-offers [this pool-name offers] "Called when offers are not processed to ensure they're still available.")) +(defn safe-kill-task + "A safe version of kill task that never throws. This reduces the risk that errors in one compute cluster propagate and cause problems in another compute cluster." + [compute-cluster task-id] + (try + (kill-task compute-cluster task-id) + (catch Throwable t + (log/error t "In compute cluster" compute-cluster "error killing task" task-id)))) + (defn kill-task-if-possible [compute-cluster task-id] "If compute cluster is nil, print a warning instead of killing the task. There are cases, in particular, lingering tasks, stragglers, or cancelled tasks where the task might outlive the compute cluster it was member of. When this occurs, the looked up compute cluster is null and trying to kill via it would cause an NPE, when in reality, it's relatively innocuous. So, we have this wrapper to use in those circumstances." (if compute-cluster - (kill-task compute-cluster task-id) + (safe-kill-task compute-cluster task-id) (log/warn "Unable to kill task " task-id " because compute-cluster is nil"))) ; Internal method diff --git a/scheduler/src/cook/mesos.clj b/scheduler/src/cook/mesos.clj index 424a4d61d9..8985e1a420 100644 --- a/scheduler/src/cook/mesos.clj +++ b/scheduler/src/cook/mesos.clj @@ -177,10 +177,15 @@ :trigger-chans trigger-chans}) running-tasks-ents (cook.tools/get-running-task-ents (d/db mesos-datomic-conn)) cluster-connected-chans (->> cluster-name->compute-cluster - vals - (map #(cc/initialize-cluster % - pool-name->fenzo - running-tasks-ents)) + (map (fn [[compute-cluster-name compute-cluster]] + (try + (cc/initialize-cluster compute-cluster + pool-name->fenzo + running-tasks-ents) + (catch Throwable t + (log/error t "Error launching compute cluster" compute-cluster-name) + ; Return a chan that never gets a message on it. + (async/chan 1))))) ; Note: This doall has a critical side effect of actually initializing ; all of the clusters. doall)] diff --git a/scheduler/src/cook/mesos/mesos_compute_cluster.clj b/scheduler/src/cook/mesos/mesos_compute_cluster.clj index f00da2cfd2..96532a4435 100644 --- a/scheduler/src/cook/mesos/mesos_compute_cluster.clj +++ b/scheduler/src/cook/mesos/mesos_compute_cluster.clj @@ -73,7 +73,7 @@ "as instance" instance "with" prior-job-state "and" prior-instance-status "should've been put down already") (meters/mark! (meters/meter (sched/metric-title "tasks-killed-in-status-update" pool-name))) - (cc/kill-task compute-cluster task-id)) + (cc/safe-kill-task compute-cluster task-id)) (sched/write-status-to-datomic conn pool->fenzo status)) (conditionally-sync-sandbox conn task-id (:state status) sync-agent-sandboxes-fn))) diff --git a/scheduler/src/cook/rebalancer.clj b/scheduler/src/cook/rebalancer.clj index a9267780d3..c9b285c97d 100644 --- a/scheduler/src/cook/rebalancer.clj +++ b/scheduler/src/cook/rebalancer.clj @@ -516,7 +516,7 @@ (log/warn e "Failed to transact preemption"))) (when-let [task-id (:instance/task-id task-ent)] (when-let [compute-cluster (task/task-ent->ComputeCluster task-ent)] - (cc/kill-task compute-cluster task-id)))))))) + (cc/safe-kill-task compute-cluster task-id)))))))) (def datomic-params [:max-preemption :min-dru-diff diff --git a/scheduler/src/cook/scheduler/scheduler.clj b/scheduler/src/cook/scheduler/scheduler.clj index e1e0a75b9a..12bbd3b322 100644 --- a/scheduler/src/cook/scheduler/scheduler.clj +++ b/scheduler/src/cook/scheduler/scheduler.clj @@ -359,7 +359,7 @@ (try (log/info "Attempting to kill task" task-id "in" compute-cluster-name "from already completed job") (meters/mark! tx-report-queue-tasks-killed) - (cc/kill-task compute-cluster task-id) + (cc/safe-kill-task compute-cluster task-id) (catch Exception e (log/error e (str "Failed to kill task" task-id)))) (log/warn "Unable to kill task" task-id "with unknown cluster" compute-cluster-name)))) @@ -393,7 +393,7 @@ (if-let [compute-cluster (cc/compute-cluster-name->ComputeCluster compute-cluster-name)] (do (log/info "Attempting to kill task" task-id "in" compute-cluster-name "due to job completion") (meters/mark! tx-report-queue-tasks-killed) - (cc/kill-task compute-cluster task-id)) + (cc/safe-kill-task compute-cluster task-id)) (log/error "Couldn't kill task" task-id "due to no Mesos driver for compute cluster" compute-cluster-name "!"))))) (catch Exception e (log/error e "Unexpected exception on tx report queue processor"))))))))) @@ -535,7 +535,10 @@ [compute-cluster offer-ids] (log/debug "Declining offers:" offer-ids) (meters/mark! scheduler-offer-declined (count offer-ids)) - (cc/decline-offers compute-cluster offer-ids)) + (try + (cc/decline-offers compute-cluster offer-ids) + (catch Throwable t + (log/error t "Error declining offers for" compute-cluster)))) (histograms/defhistogram [cook-mesos scheduler number-tasks-matched]) (histograms/defhistogram [cook-mesos-scheduler number-offers-matched]) @@ -793,7 +796,10 @@ (when-let [offers @offer-stash] ; Group the set of all offers by compute cluster and route them to that compute cluster for restoring. (doseq [[compute-cluster offer-subset] (group-by :compute-cluster offers)] - (cc/restore-offers compute-cluster pool-name offer-subset))) + (try + (cc/restore-offers compute-cluster pool-name offer-subset) + (catch Throwable t + (log/error t "For" pool-name "error restoring offers for compute cluster" compute-cluster))))) ; if an error happened, it doesn't mean we need to penalize Fenzo true))))) @@ -847,7 +853,13 @@ user->usage-future (future (generate-user-usage-map (d/db conn) pool-name)) ;; Try to clear the channel ;; Merge the pending offers from all compute clusters. - offers (apply concat (map #(cc/pending-offers % pool-name) compute-clusters)) + offers (apply concat (map (fn [compute-cluster] + (try + (cc/pending-offers compute-cluster pool-name) + (catch Throwable t + (log/error t "Error getting pending offers for " compute-cluster) + (list)))) + compute-clusters)) _ (doseq [offer offers :let [slave-id (-> offer :slave-id :value) attrs (get-offer-attr-map offer)]]