Skip to content

Commit 0e27cd1

Browse files
Merge pull request #101 from JamesWrigley/jps/threadsafe_workerstate
2 parents 3b9e4fd + 90041ca commit 0e27cd1

9 files changed

+150
-61
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Manifest.toml
2+
*.swp

src/cluster.jl

+66-46
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,10 @@ mutable struct Worker
9999
del_msgs::Array{Any,1} # XXX: Could del_msgs and add_msgs be Channels?
100100
add_msgs::Array{Any,1}
101101
@atomic gcflag::Bool
102-
state::WorkerState
103-
c_state::Condition # wait for state changes
104-
ct_time::Float64 # creation time
105-
conn_func::Any # used to setup connections lazily
102+
@atomic state::WorkerState
103+
c_state::Threads.Condition # wait for state changes, lock for state
104+
ct_time::Float64 # creation time
105+
conn_func::Any # used to setup connections lazily
106106

107107
r_stream::IO
108108
w_stream::IO
@@ -134,7 +134,7 @@ mutable struct Worker
134134
if haskey(map_pid_wrkr, id)
135135
return map_pid_wrkr[id]
136136
end
137-
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Condition(), time(), conn_func)
137+
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func)
138138
w.initialized = Event()
139139
register_worker(w)
140140
w
@@ -144,8 +144,10 @@ mutable struct Worker
144144
end
145145

146146
function set_worker_state(w, state)
147-
w.state = state
148-
notify(w.c_state; all=true)
147+
lock(w.c_state) do
148+
@atomic w.state = state
149+
notify(w.c_state; all=true)
150+
end
149151
end
150152

151153
function check_worker_state(w::Worker)
@@ -161,15 +163,16 @@ function check_worker_state(w::Worker)
161163
else
162164
w.ct_time = time()
163165
if myid() > w.id
164-
t = @async exec_conn_func(w)
166+
t = Threads.@spawn Threads.threadpool() exec_conn_func(w)
165167
else
166168
# route request via node 1
167-
t = @async remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
169+
t = Threads.@spawn Threads.threadpool() remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
168170
end
169171
errormonitor(t)
170172
wait_for_conn(w)
171173
end
172174
end
175+
return nothing
173176
end
174177

175178
exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id)::Worker)
@@ -191,9 +194,17 @@ function wait_for_conn(w)
191194
timeout = worker_timeout() - (time() - w.ct_time)
192195
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
193196

194-
@async (sleep(timeout); notify(w.c_state; all=true))
195-
wait(w.c_state)
196-
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
197+
T = Threads.@spawn Threads.threadpool() begin
198+
sleep($timeout)
199+
lock(w.c_state) do
200+
notify(w.c_state; all=true)
201+
end
202+
end
203+
errormonitor(T)
204+
lock(w.c_state) do
205+
wait(w.c_state)
206+
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
207+
end
197208
end
198209
nothing
199210
end
@@ -247,7 +258,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
247258
else
248259
sock = listen(interface, LPROC.bind_port)
249260
end
250-
errormonitor(@async while isopen(sock)
261+
errormonitor(Threads.@spawn while isopen(sock)
251262
client = accept(sock)
252263
process_messages(client, client, true)
253264
end)
@@ -279,7 +290,7 @@ end
279290

280291

281292
function redirect_worker_output(ident, stream)
282-
t = @async while !eof(stream)
293+
t = Threads.@spawn while !eof(stream)
283294
line = readline(stream)
284295
if startswith(line, " From worker ")
285296
# stdout's of "additional" workers started from an initial worker on a host are not available
@@ -318,7 +329,7 @@ function read_worker_host_port(io::IO)
318329
leader = String[]
319330
try
320331
while ntries > 0
321-
readtask = @async readline(io)
332+
readtask = Threads.@spawn Threads.threadpool() readline(io)
322333
yield()
323334
while !istaskdone(readtask) && ((time_ns() - t0) < timeout)
324335
sleep(0.05)
@@ -419,7 +430,7 @@ if launching workers programmatically, execute `addprocs` in its own task.
419430
420431
```julia
421432
# On busy clusters, call `addprocs` asynchronously
422-
t = @async addprocs(...)
433+
t = Threads.@spawn addprocs(...)
423434
```
424435
425436
```julia
@@ -485,20 +496,23 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
485496
# call manager's `launch` is a separate task. This allows the master
486497
# process initiate the connection setup process as and when workers come
487498
# online
488-
t_launch = @async launch(manager, params, launched, launch_ntfy)
499+
t_launch = Threads.@spawn Threads.threadpool() launch(manager, params, launched, launch_ntfy)
489500

490501
@sync begin
491502
while true
492503
if isempty(launched)
493504
istaskdone(t_launch) && break
494-
@async (sleep(1); notify(launch_ntfy))
505+
Threads.@spawn Threads.threadpool() begin
506+
sleep(1)
507+
notify(launch_ntfy)
508+
end
495509
wait(launch_ntfy)
496510
end
497511

498512
if !isempty(launched)
499513
wconfig = popfirst!(launched)
500514
let wconfig=wconfig
501-
@async setup_launched_worker(manager, wconfig, launched_q)
515+
Threads.@spawn Threads.threadpool() setup_launched_worker(manager, wconfig, launched_q)
502516
end
503517
end
504518
end
@@ -578,7 +592,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
578592
wconfig.port = port
579593

580594
let wconfig=wconfig
581-
@async begin
595+
Threads.@spawn Threads.threadpool() begin
582596
pid = create_worker(manager, wconfig)
583597
remote_do(redirect_output_from_additional_worker, frompid, pid, port)
584598
push!(launched_q, pid)
@@ -645,7 +659,12 @@ function create_worker(manager, wconfig)
645659
# require the value of config.connect_at which is set only upon connection completion
646660
for jw in PGRP.workers
647661
if (jw.id != 1) && (jw.id < w.id)
648-
(jw.state === W_CREATED) && wait(jw.c_state)
662+
# wait for wl to join
663+
if jw.state === W_CREATED
664+
lock(jw.c_state) do
665+
wait(jw.c_state)
666+
end
667+
end
649668
push!(join_list, jw)
650669
end
651670
end
@@ -668,7 +687,12 @@ function create_worker(manager, wconfig)
668687
end
669688

670689
for wl in wlist
671-
(wl.state === W_CREATED) && wait(wl.c_state)
690+
lock(wl.c_state) do
691+
if wl.state === W_CREATED
692+
# wait for wl to join
693+
wait(wl.c_state)
694+
end
695+
end
672696
push!(join_list, wl)
673697
end
674698
end
@@ -727,23 +751,21 @@ function redirect_output_from_additional_worker(pid, port)
727751
end
728752

729753
function check_master_connect()
730-
timeout = worker_timeout() * 1e9
731754
# If we do not have at least process 1 connect to us within timeout
732755
# we log an error and exit, unless we're running on valgrind
733756
if ccall(:jl_running_on_valgrind,Cint,()) != 0
734757
return
735758
end
736-
@async begin
737-
start = time_ns()
738-
while !haskey(map_pid_wrkr, 1) && (time_ns() - start) < timeout
739-
sleep(1.0)
740-
end
741759

742-
if !haskey(map_pid_wrkr, 1)
743-
print(stderr, "Master process (id 1) could not connect within $(timeout/1e9) seconds.\nexiting.\n")
744-
exit(1)
760+
errormonitor(
761+
Threads.@spawn begin
762+
timeout = worker_timeout()
763+
if timedwait(() -> !haskey(map_pid_wrkr, 1), timeout) === :timed_out
764+
print(stderr, "Master process (id 1) could not connect within $(timeout) seconds.\nexiting.\n")
765+
exit(1)
766+
end
745767
end
746-
end
768+
)
747769
end
748770

749771

@@ -1028,13 +1050,13 @@ function rmprocs(pids...; waitfor=typemax(Int))
10281050

10291051
pids = vcat(pids...)
10301052
if waitfor == 0
1031-
t = @async _rmprocs(pids, typemax(Int))
1053+
t = Threads.@spawn Threads.threadpool() _rmprocs(pids, typemax(Int))
10321054
yield()
10331055
return t
10341056
else
10351057
_rmprocs(pids, waitfor)
10361058
# return a dummy task object that user code can wait on.
1037-
return @async nothing
1059+
return Threads.@spawn Threads.threadpool() nothing
10381060
end
10391061
end
10401062

@@ -1217,7 +1239,7 @@ function interrupt(pids::AbstractVector=workers())
12171239
@assert myid() == 1
12181240
@sync begin
12191241
for pid in pids
1220-
@async interrupt(pid)
1242+
Threads.@spawn Threads.threadpool() interrupt(pid)
12211243
end
12221244
end
12231245
end
@@ -1288,18 +1310,16 @@ end
12881310

12891311
using Random: randstring
12901312

1291-
let inited = false
1292-
# do initialization that's only needed when there is more than 1 processor
1293-
global function init_multi()
1294-
if !inited
1295-
inited = true
1296-
push!(Base.package_callbacks, _require_callback)
1297-
atexit(terminate_all_workers)
1298-
init_bind_addr()
1299-
cluster_cookie(randstring(HDR_COOKIE_LEN))
1300-
end
1301-
return nothing
1313+
# do initialization that's only needed when there is more than 1 processor
1314+
const inited = Threads.Atomic{Bool}(false)
1315+
function init_multi()
1316+
if !Threads.atomic_cas!(inited, false, true)
1317+
push!(Base.package_callbacks, _require_callback)
1318+
atexit(terminate_all_workers)
1319+
init_bind_addr()
1320+
cluster_cookie(randstring(HDR_COOKIE_LEN))
13021321
end
1322+
return nothing
13031323
end
13041324

13051325
function init_parallel()

src/macros.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ function remotecall_eval(m::Module, procs, ex)
230230
# execute locally last as we do not want local execution to block serialization
231231
# of the request to remote nodes.
232232
for _ in 1:run_locally
233-
@async Core.eval(m, ex)
233+
Threads.@spawn Threads.threadpool() Core.eval(m, ex)
234234
end
235235
end
236236
nothing
@@ -275,7 +275,7 @@ function preduce(reducer, f, R)
275275
end
276276

277277
function pfor(f, R)
278-
t = @async @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
278+
t = Threads.@spawn Threads.threadpool() @sync for c in splitrange(Int(firstindex(R)), Int(lastindex(R)), nworkers())
279279
@spawnat :any f(R, first(c), last(c))
280280
end
281281
errormonitor(t)

src/managers.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
178178
# Wait for all launches to complete.
179179
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
180180
let machine=machine, cnt=cnt
181-
@async try
181+
Threads.@spawn Threads.threadpool() try
182182
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
183183
catch e
184184
print(stderr, "exception launching on machine $(machine) : $(e)\n")
@@ -744,7 +744,7 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; exit_timeou
744744
# First, try sending `exit()` to the remote over the usual control channels
745745
remote_do(exit, pid)
746746

747-
timer_task = @async begin
747+
timer_task = Threads.@spawn Threads.threadpool() begin
748748
sleep(exit_timeout)
749749

750750
# Check to see if our child exited, and if not, send an actual kill signal

src/messages.jl

+1-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ function flush_gc_msgs()
200200
end
201201
catch e
202202
bt = catch_backtrace()
203-
@async showerror(stderr, e, bt)
203+
Threads.@spawn showerror(stderr, e, bt)
204204
end
205205
end
206206

src/process_messages.jl

+7-7
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ function schedule_call(rid, thunk)
8585
rv = RemoteValue(def_rv_channel())
8686
(PGRP::ProcessGroup).refs[rid] = rv
8787
push!(rv.clientset, rid.whence)
88-
errormonitor(@async run_work_thunk(rv, thunk))
88+
errormonitor(Threads.@spawn run_work_thunk(rv, thunk))
8989
return rv
9090
end
9191
end
@@ -118,7 +118,7 @@ end
118118

119119
## message event handlers ##
120120
function process_messages(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool=true)
121-
errormonitor(@async process_tcp_streams(r_stream, w_stream, incoming))
121+
errormonitor(Threads.@spawn process_tcp_streams(r_stream, w_stream, incoming))
122122
end
123123

124124
function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, incoming::Bool)
@@ -148,7 +148,7 @@ Julia version number to perform the authentication handshake.
148148
See also [`cluster_cookie`](@ref).
149149
"""
150150
function process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
151-
errormonitor(@async message_handler_loop(r_stream, w_stream, incoming))
151+
errormonitor(Threads.@spawn message_handler_loop(r_stream, w_stream, incoming))
152152
end
153153

154154
function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
@@ -283,7 +283,7 @@ function handle_msg(msg::CallMsg{:call}, header, r_stream, w_stream, version)
283283
schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
284284
end
285285
function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, version)
286-
errormonitor(@async begin
286+
errormonitor(Threads.@spawn begin
287287
v = run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), false)
288288
if isa(v, SyncTake)
289289
try
@@ -299,15 +299,15 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
299299
end
300300

301301
function handle_msg(msg::CallWaitMsg, header, r_stream, w_stream, version)
302-
errormonitor(@async begin
302+
errormonitor(Threads.@spawn begin
303303
rv = schedule_call(header.response_oid, ()->invokelatest(msg.f, msg.args...; msg.kwargs...))
304304
deliver_result(w_stream, :call_wait, header.notify_oid, fetch(rv.c))
305305
nothing
306306
end)
307307
end
308308

309309
function handle_msg(msg::RemoteDoMsg, header, r_stream, w_stream, version)
310-
errormonitor(@async run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
310+
errormonitor(Threads.@spawn run_work_thunk(()->invokelatest(msg.f, msg.args...; msg.kwargs...), true))
311311
end
312312

313313
function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
@@ -350,7 +350,7 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
350350
# The constructor registers the object with a global registry.
351351
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
352352
else
353-
@async connect_to_peer(cluster_manager, rpid, wconfig)
353+
Threads.@spawn connect_to_peer(cluster_manager, rpid, wconfig)
354354
end
355355
end
356356
end

src/remotecall.jl

+2-2
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ or to use a local [`Channel`](@ref) as a proxy:
205205
```julia
206206
p = 1
207207
f = Future(p)
208-
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
208+
errormonitor(Threads.@spawn put!(f, remotecall_fetch(long_computation, p)))
209209
isready(f) # will not block
210210
```
211211
"""
@@ -322,7 +322,7 @@ function process_worker(rr)
322322
msg = (remoteref_id(rr), myid())
323323

324324
# Needs to acquire a lock on the del_msg queue
325-
T = Threads.@spawn begin
325+
T = Threads.@spawn Threads.threadpool() begin
326326
publish_del_msg!($w, $msg)
327327
end
328328
Base.errormonitor(T)

0 commit comments

Comments
 (0)