Skip to content

Commit 2d60b8a

Browse files
committed
fix :workload :compute
1 parent df5605d commit 2d60b8a

File tree

2 files changed

+7
-8
lines changed

2 files changed

+7
-8
lines changed

src/main/clojure/clojure/core/async/flow.clj

+1-2
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,7 @@
252252
253253
When :compute is specified transform must not block!"
254254
([fn-or-map] (process fn-or-map nil))
255-
([fn-or-map {:keys [workload timeout-ms]
256-
:or {timeout-ms 5000} :as opts}]
255+
([fn-or-map {:keys [workload compute-timeout-ms] :as opts}]
257256
(impl/proc fn-or-map opts)))
258257

259258
(defn lift*->step

src/main/clojure/clojure/core/async/flow/impl.clj

+6-6
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
clojure.lang.Var (symbol x)
2727
(datafy/datafy x)))
2828

29-
(defn futurize ^Future [f {:keys [exec]}]
29+
(defn futurize [f {:keys [exec]}]
3030
(fn [& args]
3131
(let [^ExecutorService e (if (instance? ExecutorService exec)
3232
exec
@@ -219,7 +219,7 @@
219219

220220
(defn proc
221221
"see lib ns for docs"
222-
[fm {:keys [workload compute-timeout-ms]}]
222+
[fm {:keys [workload compute-timeout-ms] :or {compute-timeout-ms 5000}}]
223223
(let [{:keys [describe init transition transform] :as impl}
224224
(if (map? fm) fm {:describe fm :init fm :transition fm :transform fm})
225225
{:keys [params ins] :as desc} (describe)
@@ -236,10 +236,10 @@
236236
(describe [_] desc)
237237
(start [_ {:keys [pid args ins outs resolver]}]
238238
(assert (or (not params) args) "must provide :args if :params")
239-
(let [comp? (= workload :compute)
240-
transform (cond-> transform (= workload :compute)
241-
#(.get (futurize transform {:exec (spi/get-exec resolver :compute)})
242-
compute-timeout-ms TimeUnit/MILLISECONDS))
239+
(let [transform (if (= workload :compute)
240+
#(.get ^Future ((futurize transform {:exec (spi/get-exec resolver :compute)}) %1 %2 %3)
241+
compute-timeout-ms TimeUnit/MILLISECONDS)
242+
transform)
243243
exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io))
244244
state (when init (init args))
245245
ins (into (or ins {}) (::flow/in-ports state))

0 commit comments

Comments
 (0)