Skip to content
This repository has been archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
Make our compute cluster code more robust. (#1324)
Browse files Browse the repository at this point in the history
We don't want a single compute cluster failing to take down all of cook,
so wrap everything in a try-catch.
  • Loading branch information
scrosby authored and nsinkov committed Dec 4, 2019
1 parent 90e9f02 commit 522cb4e
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 12 deletions.
10 changes: 9 additions & 1 deletion scheduler/src/cook/compute_cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,21 @@
(restore-offers [this pool-name offers]
"Called when offers are not processed to ensure they're still available."))

(defn safe-kill-task
"A safe version of kill task that never throws. This reduces the risk that errors in one compute cluster propagate and cause problems in another compute cluster."
[compute-cluster task-id]
(try
(kill-task compute-cluster task-id)
(catch Throwable t
(log/error t "In compute cluster" compute-cluster "error killing task" task-id))))

(defn kill-task-if-possible [compute-cluster task-id]
"If compute cluster is nil, print a warning instead of killing the task. There are cases, in particular,
lingering tasks, stragglers, or cancelled tasks where the task might outlive the compute cluster it was
member of. When this occurs, the looked up compute cluster is null and trying to kill via it would cause an NPE,
when in reality, it's relatively innocuous. So, we have this wrapper to use in those circumstances."
(if compute-cluster
(kill-task compute-cluster task-id)
(safe-kill-task compute-cluster task-id)
(log/warn "Unable to kill task " task-id " because compute-cluster is nil")))

; Internal method
Expand Down
13 changes: 9 additions & 4 deletions scheduler/src/cook/mesos.clj
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,15 @@
:trigger-chans trigger-chans})
running-tasks-ents (cook.tools/get-running-task-ents (d/db mesos-datomic-conn))
cluster-connected-chans (->> cluster-name->compute-cluster
vals
(map #(cc/initialize-cluster %
pool-name->fenzo
running-tasks-ents))
(map (fn [[compute-cluster-name compute-cluster]]
(try
(cc/initialize-cluster compute-cluster
pool-name->fenzo
running-tasks-ents)
(catch Throwable t
(log/error t "Error launching compute cluster" compute-cluster-name)
; Return a chan that never gets a message on it.
(async/chan 1)))))
; Note: This doall has a critical side effect of actually initializing
; all of the clusters.
doall)]
Expand Down
2 changes: 1 addition & 1 deletion scheduler/src/cook/mesos/mesos_compute_cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
"as instance" instance "with" prior-job-state "and" prior-instance-status
"should've been put down already")
(meters/mark! (meters/meter (sched/metric-title "tasks-killed-in-status-update" pool-name)))
(cc/kill-task compute-cluster task-id))
(cc/safe-kill-task compute-cluster task-id))
(sched/write-status-to-datomic conn pool->fenzo status))
(conditionally-sync-sandbox conn task-id (:state status) sync-agent-sandboxes-fn)))

Expand Down
2 changes: 1 addition & 1 deletion scheduler/src/cook/rebalancer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@
(log/warn e "Failed to transact preemption")))
(when-let [task-id (:instance/task-id task-ent)]
(when-let [compute-cluster (task/task-ent->ComputeCluster task-ent)]
(cc/kill-task compute-cluster task-id))))))))
(cc/safe-kill-task compute-cluster task-id))))))))

(def datomic-params [:max-preemption
:min-dru-diff
Expand Down
22 changes: 17 additions & 5 deletions scheduler/src/cook/scheduler/scheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@
(try
(log/info "Attempting to kill task" task-id "in" compute-cluster-name "from already completed job")
(meters/mark! tx-report-queue-tasks-killed)
(cc/kill-task compute-cluster task-id)
(cc/safe-kill-task compute-cluster task-id)
(catch Exception e
(log/error e (str "Failed to kill task" task-id))))
(log/warn "Unable to kill task" task-id "with unknown cluster" compute-cluster-name))))
Expand Down Expand Up @@ -393,7 +393,7 @@
(if-let [compute-cluster (cc/compute-cluster-name->ComputeCluster compute-cluster-name)]
(do (log/info "Attempting to kill task" task-id "in" compute-cluster-name "due to job completion")
(meters/mark! tx-report-queue-tasks-killed)
(cc/kill-task compute-cluster task-id))
(cc/safe-kill-task compute-cluster task-id))
(log/error "Couldn't kill task" task-id "due to no Mesos driver for compute cluster" compute-cluster-name "!")))))
(catch Exception e
(log/error e "Unexpected exception on tx report queue processor")))))))))
Expand Down Expand Up @@ -535,7 +535,10 @@
[compute-cluster offer-ids]
(log/debug "Declining offers:" offer-ids)
(meters/mark! scheduler-offer-declined (count offer-ids))
(cc/decline-offers compute-cluster offer-ids))
(try
(cc/decline-offers compute-cluster offer-ids)
(catch Throwable t
(log/error t "Error declining offers for" compute-cluster))))

(histograms/defhistogram [cook-mesos scheduler number-tasks-matched])
(histograms/defhistogram [cook-mesos-scheduler number-offers-matched])
Expand Down Expand Up @@ -793,7 +796,10 @@
(when-let [offers @offer-stash]
; Group the set of all offers by compute cluster and route them to that compute cluster for restoring.
(doseq [[compute-cluster offer-subset] (group-by :compute-cluster offers)]
(cc/restore-offers compute-cluster pool-name offer-subset)))
(try
(cc/restore-offers compute-cluster pool-name offer-subset)
(catch Throwable t
(log/error t "For" pool-name "error restoring offers for compute cluster" compute-cluster)))))
; if an error happened, it doesn't mean we need to penalize Fenzo
true)))))

Expand Down Expand Up @@ -847,7 +853,13 @@
user->usage-future (future (generate-user-usage-map (d/db conn) pool-name))
;; Try to clear the channel
;; Merge the pending offers from all compute clusters.
offers (apply concat (map #(cc/pending-offers % pool-name) compute-clusters))
offers (apply concat (map (fn [compute-cluster]
(try
(cc/pending-offers compute-cluster pool-name)
(catch Throwable t
(log/error t "Error getting pending offers for " compute-cluster)
(list))))
compute-clusters))
_ (doseq [offer offers
:let [slave-id (-> offer :slave-id :value)
attrs (get-offer-attr-map offer)]]
Expand Down

0 comments on commit 522cb4e

Please sign in to comment.