Skip to content

Commit 8e35ba4

Browse files
committed
fixup! Make worker state variable threadsafe
1 parent 9283e6f commit 8e35ba4

File tree

4 files changed

+14
-14
lines changed

4 files changed

+14
-14
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ jobs:
5454
- uses: julia-actions/julia-buildpkg@v1
5555
- uses: julia-actions/julia-runtest@v1
5656
env:
57-
JULIA_DISTRIBUTED_TESTING_STANDALONE: 1
57+
JULIA_NUM_THREADS: 4
5858
- uses: julia-actions/julia-processcoverage@v1
5959
- uses: codecov/codecov-action@v4
6060
with:

src/cluster.jl

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ function set_worker_state(w, state)
151151
end
152152

153153
function check_worker_state(w::Worker)
154-
if w.state === W_CREATED
154+
if (@atomic w.state) === W_CREATED
155155
if !isclusterlazy()
156156
if PGRP.topology === :all_to_all
157157
# Since higher pids connect with lower pids, the remote worker
@@ -190,7 +190,7 @@ function exec_conn_func(w::Worker)
190190
end
191191

192192
function wait_for_conn(w)
193-
if w.state === W_CREATED
193+
if (@atomic w.state) === W_CREATED
194194
timeout = worker_timeout() - (time() - w.ct_time)
195195
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")
196196

@@ -203,7 +203,7 @@ function wait_for_conn(w)
203203
errormonitor(T)
204204
lock(w.c_state) do
205205
wait(w.c_state)
206-
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
206+
(@atomic w.state) === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
207207
end
208208
end
209209
nothing
@@ -661,7 +661,7 @@ function create_worker(manager, wconfig)
661661
if (jw.id != 1) && (jw.id < w.id)
662662
lock(jw.c_state) do
663663
# wait for wl to join
664-
if jw.state === W_CREATED
664+
if (@atomic jw.state) === W_CREATED
665665
wait(jw.c_state)
666666
end
667667
end
@@ -688,7 +688,7 @@ function create_worker(manager, wconfig)
688688

689689
for wl in wlist
690690
lock(wl.c_state) do
691-
if wl.state === W_CREATED
691+
if (@atomic wl.state) === W_CREATED
692692
# wait for wl to join
693693
wait(wl.c_state)
694694
end
@@ -903,7 +903,7 @@ function nprocs()
903903
n = length(PGRP.workers)
904904
# filter out workers in the process of being setup/shutdown.
905905
for jw in PGRP.workers
906-
if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
906+
if !isa(jw, LocalProcess) && ((@atomic jw.state) !== W_CONNECTED)
907907
n = n - 1
908908
end
909909
end
@@ -954,7 +954,7 @@ julia> procs()
954954
function procs()
955955
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
956956
# filter out workers in the process of being setup/shutdown.
957-
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
957+
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
958958
else
959959
return Int[x.id for x in PGRP.workers]
960960
end
@@ -963,7 +963,7 @@ end
963963
function id_in_procs(id) # faster version of `id in procs()`
964964
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
965965
for x in PGRP.workers
966-
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
966+
if (x.id::Int) == id && (isa(x, LocalProcess) || (@atomic (x::Worker).state) === W_CONNECTED)
967967
return true
968968
end
969969
end
@@ -985,7 +985,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
985985
"""
986986
function procs(pid::Integer)
987987
if myid() == 1
988-
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
988+
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || ((@atomic x.state) === W_CONNECTED)]
989989
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
990990
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
991991
else
@@ -1092,11 +1092,11 @@ function _rmprocs(pids, waitfor)
10921092

10931093
start = time_ns()
10941094
while (time_ns() - start) < waitfor*1e9
1095-
all(w -> w.state === W_TERMINATED, rmprocset) && break
1095+
all(w -> (@atomic w.state) === W_TERMINATED, rmprocset) && break
10961096
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
10971097
end
10981098

1099-
unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
1099+
unremoved = [wrkr.id for wrkr in filter(w -> (@atomic w.state) !== W_TERMINATED, rmprocset)]
11001100
if length(unremoved) > 0
11011101
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
11021102
throw(ErrorException(estr))

src/messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ end
194194
function flush_gc_msgs()
195195
try
196196
for w in (PGRP::ProcessGroup).workers
197-
if isa(w,Worker) && (w.state == W_CONNECTED) && w.gcflag
197+
if isa(w,Worker) && ((@atomic w.state) == W_CONNECTED) && w.gcflag
198198
flush_gc_msgs(w)
199199
end
200200
end

src/process_messages.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ function message_handler_loop(r_stream::IO, w_stream::IO, incoming::Bool)
222222
println(stderr, "Process($(myid())) - Unknown remote, closing connection.")
223223
elseif !(wpid in map_del_wrkr)
224224
werr = worker_from_id(wpid)
225-
oldstate = werr.state
225+
oldstate = @atomic werr.state
226226
set_worker_state(werr, W_TERMINATED)
227227

228228
# If unhandleable error occurred talking to pid 1, exit

0 commit comments

Comments
 (0)