Skip to content

Commit 6a0383b

Browse files
Add a wait(::[Abstract]WorkerPool) (#106)
Original PR: JuliaLang/julia#48238 Co-authored-by: kleinschmidt <[email protected]>
1 parent 1cd2677 commit 6a0383b

File tree

2 files changed

+28
-1
lines changed

2 files changed

+28
-1
lines changed

src/workerpool.jl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ An `AbstractWorkerPool` should implement:
88
- [`push!`](@ref) - add a new worker to the overall pool (available + busy)
99
- [`put!`](@ref) - put back a worker to the available pool
1010
- [`take!`](@ref) - take a worker from the available pool (to be used for remote function execution)
11+
- [`wait`](@ref) - block until a worker is available
1112
- [`length`](@ref) - number of workers available in the overall pool
1213
- [`isready`](@ref) - return false if a `take!` on the pool would block, else true
1314
@@ -120,6 +121,11 @@ function wp_local_take!(pool::AbstractWorkerPool)
120121
return worker
121122
end
122123

124+
function wp_local_wait(pool::AbstractWorkerPool)
125+
wait(pool.channel)
126+
return nothing
127+
end
128+
123129
function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
124130
worker = take!(pool)
125131
try
@@ -133,7 +139,7 @@ end
133139
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
134140
# it avoids the overhead associated with a local remotecall.
135141

136-
for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int))
142+
for (func, rt) = ((:length, Int), (:isready, Bool), (:workers, Vector{Int}), (:nworkers, Int), (:take!, Int), (:wait, Nothing))
137143
func_local = Symbol(string("wp_local_", func))
138144
@eval begin
139145
function ($func)(pool::WorkerPool)

test/distributed_exec.jl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -709,6 +709,27 @@ wp = WorkerPool(workers())
709709
wp = WorkerPool(2:3)
710710
@test sort(unique(pmap(_->myid(), wp, 1:100))) == [2,3]
711711

712+
# wait on worker pool
713+
wp = WorkerPool(2:2)
714+
w = take!(wp)
715+
716+
# local call to _wait
717+
@test !isready(wp)
718+
t = @async wait(wp)
719+
@test !istaskdone(t)
720+
put!(wp, w)
721+
status = timedwait(() -> istaskdone(t), 10)
722+
@test status == :ok
723+
724+
# remote call to _wait
725+
take!(wp)
726+
@test !isready(wp)
727+
f = @spawnat w wait(wp)
728+
@test !isready(f)
729+
put!(wp, w)
730+
status = timedwait(() -> isready(f), 10)
731+
@test status == :ok
732+
712733
# CachingPool tests
713734
wp = CachingPool(workers())
714735
@test [1:100...] == pmap(x->x, wp, 1:100)

0 commit comments

Comments
 (0)