Skip to content

Commit 22bbd5e

Browse files
committed
Specialize remotecall_pool(remotecall) to wait for the remotecall
Otherwise the worker would prematurely be put back into the pool, causing oversubscription. Also added a warning about oversubscription to the docstring for `remote_do(f, ::AbstractWorkerPool)`.
1 parent 1c7eb92 commit 22bbd5e

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

src/workerpool.jl

+26
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,28 @@ function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
135135
end
136136
end
137137

138+
# Specialization for remotecall. We have to wait for the Future it returns
139+
# before putting the worker back in the pool.
140+
function remotecall_pool(rc_f::typeof(remotecall), f, pool::AbstractWorkerPool, args...; kwargs...)
141+
worker = take!(pool)
142+
local x
143+
try
144+
x = rc_f(f, worker, args...; kwargs...)
145+
catch
146+
put!(pool, worker)
147+
rethrow()
148+
end
149+
150+
t = Threads.@spawn try
151+
wait(x)
152+
finally
153+
put!(pool, worker)
154+
end
155+
errormonitor(t)
156+
157+
return x
158+
end
159+
138160
# Check if pool is local or remote and forward calls if required.
139161
# NOTE: remotecall_fetch does it automatically, but this will be more efficient as
140162
# it avoids the overhead associated with a local remotecall.
@@ -242,6 +264,10 @@ remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_p
242264
243265
[`WorkerPool`](@ref) variant of `remote_do(f, pid, ....)`. Wait for and take a free worker from `pool` and
244266
perform a `remote_do` on it.
267+
268+
Note that it's not possible to wait for the result of a `remote_do()` to finish
269+
so the worker will immediately be put back in the pool (i.e. potentially causing
270+
oversubscription).
245271
"""
246272
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remote_do, f, pool, args...; kwargs...)
247273

test/distributed_exec.jl

+15
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,21 @@ let
10891089
@test_throws RemoteException fetch(ref)
10901090
end
10911091

1092+
# Test the behaviour of remotecall(f, ::AbstractWorkerPool), this should
1093+
# keep the worker out of the pool until the underlying remotecall has
1094+
# finished.
1095+
let
1096+
remotechan = RemoteChannel(wrkr1)
1097+
pool = WorkerPool([wrkr1])
1098+
put_future = remotecall(() -> wait(remotechan), pool)
1099+
@test !isready(pool)
1100+
put!(remotechan, 1)
1101+
wait(put_future)
1102+
# The task that waits on the future to put it back into the pool runs
1103+
# asynchronously so we use timedwait() to check when the worker is back in.
1104+
@test timedwait(() -> isready(pool), 10) === :ok
1105+
end
1106+
10921107
# Test calling @everywhere from a module not defined on the workers
10931108
module LocalBar
10941109
using Distributed

0 commit comments

Comments
 (0)