Skip to content

Commit 50bdaeb

Browse files
committed
Keep most 'client oriented' tasks in the same threadpool
This is to avoid them accidentally running in another (potentially busy) threadpool.
1 parent cdc14b1 commit 50bdaeb

File tree

4 files changed

+16
-16
lines changed

4 files changed

+16
-16
lines changed

Diff for: src/cluster.jl

+11-11
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ function check_worker_state(w::Worker)
163163
else
164164
w.ct_time = time()
165165
if myid() > w.id
166-
t = Threads.@spawn exec_conn_func(w)
166+
t = Threads.@spawn Threads.threadpool() exec_conn_func(w)
167167
else
168168
# route request via node 1
169-
t = Threads.@spawn remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
169+
t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
170170
end
171171
errormonitor(t)
172172
wait_for_conn(w)
@@ -194,7 +194,7 @@ function wait_for_conn(w)
194194
timeout = worker_timeout() - (time() - w.ct_time)
195195
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
196196

197-
T = Threads.@spawn begin
197+
T = Threads.@spawn Threads.threadpool() begin
198198
sleep($timeout)
199199
lock(w.c_state) do
200200
notify(w.c_state; all=true)
@@ -329,7 +329,7 @@ function read_worker_host_port(io::IO)
329329
leader = String[]
330330
try
331331
while ntries > 0
332-
readtask = Threads.@spawn readline(io)
332+
readtask = Threads.@spawn Threads.threadpool() readline(io)
333333
yield()
334334
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
335335
sleep(0.05)
@@ -496,13 +496,13 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
496496
# call manager's `launch` is a separate task. This allows the master
497497
# process initiate the connection setup process as and when workers come
498498
# online
499-
t_launch = Threads.@spawn launch(manager, params, launched, launch_ntfy)
499+
t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy)
500500

501501
@sync begin
502502
while true
503503
if isempty(launched)
504504
istaskdone(t_launch) && break
505-
Threads.@spawn begin
505+
Threads.@spawn Threads.threadpool() begin
506506
sleep(1)
507507
notify(launch_ntfy)
508508
end
@@ -512,7 +512,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
512512
if !isempty(launched)
513513
wconfig = popfirst!(launched)
514514
let wconfig=wconfig
515-
Threads.@spawn setup_launched_worker(manager, wconfig, launched_q)
515+
Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q)
516516
end
517517
end
518518
end
@@ -592,7 +592,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
592592
wconfig.port = port
593593

594594
let wconfig=wconfig
595-
Threads.@spawn begin
595+
Threads.@spawn Threads.threadpool() begin
596596
pid = create_worker(manager, wconfig)
597597
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
598598
push!(launched_q, pid)
@@ -1050,13 +1050,13 @@ function rmprocs(pids...; waitfor=typemax(Int))
10501050

10511051
pids = vcat(pids...)
10521052
if waitfor == 0
1053-
t = Threads.@spawn _rmprocs(pids, typemax(Int))
1053+
t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int))
10541054
yield()
10551055
return t
10561056
else
10571057
_rmprocs(pids, waitfor)
10581058
# return a dummy task object that user code can wait on.
1059-
return Threads.@spawn nothing
1059+
return Threads.@spawn Threads.threadpool() nothing
10601060
end
10611061
end
10621062

@@ -1239,7 +1239,7 @@ function interrupt(pids::AbstractVector=workers())
12391239
@assert myid() == 1
12401240
@sync begin
12411241
for pid in pids
1242-
Threads.@spawn interrupt(pid)
1242+
Threads.@spawn Threads.threadpool() interrupt(pid)
12431243
end
12441244
end
12451245
end

Diff for: src/macros.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
230230
# execute locally last as we do not want local execution to block serialization
231231
# of the request to remote nodes.
232232
for _ in 1:run_locally
233-
Threads.@spawn Core.eval(m, ex)
233+
Threads.@spawn Threads.threadpool() Core.eval(m, ex)
234234
end
235235
end
236236
nothing
@@ -275,7 +275,7 @@ function preduce(reducer, f, R)
275275
end
276276

277277
function pfor(f, R)
278-
t = Threads.@spawn @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
278+
t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
279279
@spawnat :any f(R, first(c), last(c))
280280
end
281281
errormonitor(t)

Diff for: src/managers.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
178178
# Wait for all launches to complete.
179179
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
180180
let machine=machine, cnt=cnt
181-
Threads.@spawn try
181+
Threads.@spawn Threads.threadpool() try
182182
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
183183
catch e
184184
print(stderr, "exception launching on machine $(machine) : $(e)\n")
@@ -744,7 +744,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou
744744
# First, try sending `exit()` to the remote over the usual control channels
745745
remote_do(exit, pid)
746746

747-
timer_task = Threads.@spawn begin
747+
timer_task = Threads.@spawn Threads.threadpool() begin
748748
sleep(exit_timeout)
749749

750750
# Check to see if our child exited, and if not, send an actual kill signal

Diff for: src/remotecall.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ function process_worker(rr)
322322
msg = (remoteref_id(rr), myid())
323323

324324
# Needs to acquire a lock on the del_msg queue
325-
T = Threads.@spawn begin
325+
T = Threads.@spawn Threads.threadpool() begin
326326
publish_del_msg!($w, $msg)
327327
end
328328
Base.errormonitor(T)

0 commit comments

Comments
 (0)