Skip to content

Commit

Permalink
No longer exposing new arity in thread-call
Browse files Browse the repository at this point in the history
  • Loading branch information
fogus committed Jan 27, 2025
1 parent 326dbb5 commit a583e84
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 32 deletions.
54 changes: 25 additions & 29 deletions src/main/clojure/clojure/core/async.clj
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ to catch and handle."
clojure.core.async.impl.go ;; TODO: make conditional
[clojure.core.async.impl.mutex :as mutex]
[clojure.core.async.impl.concurrent :as conc]
)
[clojure.core.async.impl.exec.threadpool :as threadp])
(:import [java.util.concurrent.atomic AtomicLong]
[java.util.concurrent.locks Lock]
[java.util.concurrent Executors Executor ThreadLocalRandom ExecutorService]
Expand Down Expand Up @@ -462,45 +462,41 @@ to catch and handle."
[& body]
(#'clojure.core.async.impl.go/go-impl &env body))

(defonce ^ExecutorService mixed-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-mixed-%d" true)))

(defonce ^ExecutorService io-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-io-%d" true)))

(defonce ^ExecutorService compute-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-compute-%d" true)))
(defn- best-fit-thread-call
[f exec]
(let [c (chan 1)
^ExecutorService e (case exec
:compute threadp/compute-executor
:io threadp/io-executor
threadp/mixed-executor)]
(let [binds (Var/getThreadBindingFrame)]
(.execute e
(fn []
(Var/resetThreadBindingFrame binds)
(try
(let [ret (f)]
(when-not (nil? ret)
(>!! c ret)))
(finally
(close! c))))))
c))

(defn thread-call
"Executes f in another thread, returning immediately to the calling
thread. Returns a channel which will receive the result of calling
f when completed, then close."
([f] (thread-call f :mixed))
([f exec]
(let [c (chan 1)
^ExecutorService e (case exec
:compute compute-executor
:io io-executor
mixed-executor)]
(let [binds (Var/getThreadBindingFrame)]
(.execute e
(fn []
(Var/resetThreadBindingFrame binds)
(try
(let [ret (f)]
(when-not (nil? ret)
(>!! c ret)))
(finally
(close! c))))))
c)))
f when completed, then close. exec is a keyword that describes the
nature of f's workload, one of :mixed (default) :io or :compute
whereby core.async may be able to choose a best fit thread type."
[f]
(best-fit-thread-call f :mixed))

(defmacro io-thread
"Executes the body in a thread intended for blocking I/O workloads,
returning immediately to the calling thread. The body must not do
extended computation (if so, use 'thread' instead). Returns a channel
which will receive the result of the body when completed, then close."
[& body]
`(thread-call (^:once fn* [] ~@body) :io))
`(#'best-fit-thread-call (^:once fn* [] ~@body) :io))

(defmacro thread
"Executes the body in another thread, returning immediately to the
Expand Down
5 changes: 3 additions & 2 deletions src/main/clojure/clojure/core/async/impl/concurrent.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
;; You must not remove this notice, or any other, from this software.

(ns ^{:skip-wiki true}
clojure.core.async.impl.concurrent
(:import [java.util.concurrent ThreadFactory]))
clojure.core.async.impl.concurrent
(:import [java.util.concurrent ThreadFactory Executors ExecutorService]
[clojure.lang Var]))

(set! *warn-on-reflection* true)

Expand Down
11 changes: 10 additions & 1 deletion src/main/clojure/clojure/core/async/impl/exec/threadpool.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
(ns clojure.core.async.impl.exec.threadpool
(:require [clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.concurrent :as conc])
(:import [java.util.concurrent Executors]))
(:import [java.util.concurrent Executors ExecutorService]))

(set! *warn-on-reflection* true)

Expand All @@ -30,3 +30,12 @@
(reify impl/Executor
(impl/exec [_ r]
(.execute executor-svc ^Runnable r))))))

(defonce ^ExecutorService mixed-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-mixed-%d" true)))

(defonce ^ExecutorService io-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-io-%d" true)))

(defonce ^ExecutorService compute-executor
(Executors/newCachedThreadPool (conc/counted-thread-factory "async-compute-%d" true)))

0 comments on commit a583e84

Please sign in to comment.