Skip to content

Commit a2e7b7a

Browse files
committed
fixup! Make worker state variable threadsafe
1 parent 02fe4d5 commit a2e7b7a

File tree

3 files changed

+108
-97
lines changed

3 files changed

+108
-97
lines changed

test/distributed_exec.jl

Lines changed: 0 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
using Test, DistributedNext, Random, Serialization, Sockets
44
import DistributedNext: launch, manage
55

6-
import LibSSH as ssh
7-
import LibSSH.Demo: DemoServer
86

97
@test cluster_cookie() isa String
108

@@ -762,98 +760,6 @@ if DoFullTest
762760
@test all([p == remotecall_fetch(myid, p) for p in all_w])
763761
end
764762

765-
# LibSSH.jl currently only works on 64bit unixes
766-
if Sys.isunix() && Sys.WORD_SIZE == 64
767-
function test_n_remove_pids(new_pids)
768-
for p in new_pids
769-
w_in_remote = sort(remotecall_fetch(workers, p))
770-
try
771-
@test intersect(new_pids, w_in_remote) == new_pids
772-
catch
773-
print("p : $p\n")
774-
print("newpids : $new_pids\n")
775-
print("w_in_remote : $w_in_remote\n")
776-
print("intersect : $(intersect(new_pids, w_in_remote))\n\n\n")
777-
rethrow()
778-
end
779-
end
780-
781-
remotecall_fetch(rmprocs, 1, new_pids)
782-
end
783-
784-
println("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.")
785-
println("Please ensure port 9300 and 2222 are not in use.")
786-
787-
DemoServer(2222; auth_methods=[ssh.AuthMethod_None], allow_auth_none=true, verbose=false, timeout=3600) do
788-
sshflags = `-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=ERROR -p 2222 `
789-
#Issue #9951
790-
hosts=[]
791-
localhost_aliases = ["localhost", string(getipaddr()), "127.0.0.1"]
792-
num_workers = parse(Int,(get(ENV, "JULIA_ADDPROCS_NUM", "9")))
793-
794-
for i in 1:(num_workers/length(localhost_aliases))
795-
append!(hosts, localhost_aliases)
796-
end
797-
798-
# CI machines sometimes don't already have a .ssh directory
799-
ssh_dir = joinpath(homedir(), ".ssh")
800-
if !isdir(ssh_dir)
801-
mkdir(ssh_dir)
802-
end
803-
804-
print("\nTesting SSH addprocs with $(length(hosts)) workers...\n")
805-
new_pids = addprocs_with_testenv(hosts; sshflags=sshflags)
806-
@test length(new_pids) == length(hosts)
807-
test_n_remove_pids(new_pids)
808-
809-
print("\nMixed ssh addprocs with :auto\n")
810-
new_pids = addprocs_with_testenv(["localhost", ("127.0.0.1", :auto), "localhost"]; sshflags=sshflags)
811-
@test length(new_pids) == (2 + Sys.CPU_THREADS)
812-
test_n_remove_pids(new_pids)
813-
814-
print("\nMixed ssh addprocs with numeric counts\n")
815-
new_pids = addprocs_with_testenv([("localhost", 2), ("127.0.0.1", 2), "localhost"]; sshflags=sshflags)
816-
@test length(new_pids) == 5
817-
test_n_remove_pids(new_pids)
818-
819-
print("\nssh addprocs with tunnel\n")
820-
new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, sshflags=sshflags)
821-
@test length(new_pids) == num_workers
822-
test_n_remove_pids(new_pids)
823-
824-
print("\nssh addprocs with tunnel (SSH multiplexing)\n")
825-
new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, multiplex=true, sshflags=sshflags)
826-
@test length(new_pids) == num_workers
827-
controlpath = joinpath(ssh_dir, "julia-$(ENV["USER"])@localhost:2222")
828-
@test issocket(controlpath)
829-
test_n_remove_pids(new_pids)
830-
@test :ok == timedwait(()->!issocket(controlpath), 10.0; pollint=0.5)
831-
832-
print("\nAll supported formats for hostname\n")
833-
h1 = "localhost"
834-
user = ENV["USER"]
835-
h2 = "$user@$h1"
836-
h3 = "$h2:2222"
837-
h4 = "$h3 $(string(getipaddr()))"
838-
h5 = "$h4:9300"
839-
840-
new_pids = addprocs_with_testenv([h1, h2, h3, h4, h5]; sshflags=sshflags)
841-
@test length(new_pids) == 5
842-
test_n_remove_pids(new_pids)
843-
844-
print("\nkeyword arg exename\n")
845-
for exename in [`$(joinpath(Sys.BINDIR, Base.julia_exename()))`, "$(joinpath(Sys.BINDIR, Base.julia_exename()))"]
846-
for addp_func in [()->addprocs_with_testenv(["localhost"]; exename=exename, exeflags=test_exeflags, sshflags=sshflags),
847-
()->addprocs_with_testenv(1; exename=exename, exeflags=test_exeflags)]
848-
849-
local new_pids = addp_func()
850-
@test length(new_pids) == 1
851-
test_n_remove_pids(new_pids)
852-
end
853-
end
854-
end
855-
end # unix-only
856-
857763
let t = @task 42
858764
schedule(t, ErrorException(""), error=true)
859765
@test_throws TaskFailedException(t) Base.wait(t)

test/runtests.jl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
# Run the distributed test outside of the main driver since it needs its own
44
# set of dedicated workers.
55
include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl"))
6-
disttestfile = joinpath(@__DIR__, "distributed_exec.jl")
76

8-
cmd = `$test_exename $test_exeflags $disttestfile`
7+
cmd = `$test_exename $test_exeflags`
8+
9+
# Run the SSH tests with a single thread because LibSSH.jl is not thread-safe
10+
sshtestfile = joinpath(@__DIR__, "sshmanager.jl")
11+
run(addenv(`$cmd $sshtestfile`, "JULIA_NUM_THREADS" => "1"))
912

10-
if !success(pipeline(cmd; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on_valgrind,Cint,()) == 0
13+
disttestfile = joinpath(@__DIR__, "distributed_exec.jl")
14+
if !success(pipeline(`$cmd $disttestfile`; stdout=stdout, stderr=stderr)) && ccall(:jl_running_on_valgrind,Cint,()) == 0
1115
error("Distributed test failed, cmd : $cmd")
1216
end
1317

test/sshmanager.jl

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
using Test
2+
using DistributedNext
3+
import Sockets: getipaddr
4+
5+
import LibSSH as ssh
6+
import LibSSH.Demo: DemoServer
7+
8+
9+
include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl"))
10+
11+
# LibSSH.jl currently only works on 64bit unixes
12+
if Sys.isunix() && Sys.WORD_SIZE == 64
13+
function test_n_remove_pids(new_pids)
14+
for p in new_pids
15+
w_in_remote = sort(remotecall_fetch(workers, p))
16+
try
17+
@test intersect(new_pids, w_in_remote) == new_pids
18+
catch
19+
print("p : $p\n")
20+
print("newpids : $new_pids\n")
21+
print("w_in_remote : $w_in_remote\n")
22+
print("intersect : $(intersect(new_pids, w_in_remote))\n\n\n")
23+
rethrow()
24+
end
25+
end
26+
27+
remotecall_fetch(rmprocs, 1, new_pids)
28+
end
29+
30+
println("\n\nTesting SSHManager. A minimum of 4GB of RAM is recommended.")
31+
println("Please ensure port 9300 and 2222 are not in use.")
32+
33+
DemoServer(2222; auth_methods=[ssh.AuthMethod_None], allow_auth_none=true, verbose=false, timeout=3600) do
34+
sshflags = `-o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -o LogLevel=ERROR -p 2222 `
35+
#Issue #9951
36+
hosts=[]
37+
localhost_aliases = ["localhost", string(getipaddr()), "127.0.0.1"]
38+
num_workers = parse(Int,(get(ENV, "JULIA_ADDPROCS_NUM", "9")))
39+
40+
for i in 1:(num_workers/length(localhost_aliases))
41+
append!(hosts, localhost_aliases)
42+
end
43+
44+
# CI machines sometimes don't already have a .ssh directory
45+
ssh_dir = joinpath(homedir(), ".ssh")
46+
if !isdir(ssh_dir)
47+
mkdir(ssh_dir)
48+
end
49+
50+
print("\nTesting SSH addprocs with $(length(hosts)) workers...\n")
51+
new_pids = addprocs_with_testenv(hosts; sshflags=sshflags)
52+
@test length(new_pids) == length(hosts)
53+
test_n_remove_pids(new_pids)
54+
55+
print("\nMixed ssh addprocs with :auto\n")
56+
new_pids = addprocs_with_testenv(["localhost", ("127.0.0.1", :auto), "localhost"]; sshflags=sshflags)
57+
@test length(new_pids) == (2 + Sys.CPU_THREADS)
58+
test_n_remove_pids(new_pids)
59+
60+
print("\nMixed ssh addprocs with numeric counts\n")
61+
new_pids = addprocs_with_testenv([("localhost", 2), ("127.0.0.1", 2), "localhost"]; sshflags=sshflags)
62+
@test length(new_pids) == 5
63+
test_n_remove_pids(new_pids)
64+
65+
print("\nssh addprocs with tunnel\n")
66+
new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, sshflags=sshflags)
67+
@test length(new_pids) == num_workers
68+
test_n_remove_pids(new_pids)
69+
70+
print("\nssh addprocs with tunnel (SSH multiplexing)\n")
71+
new_pids = addprocs_with_testenv([("localhost", num_workers)]; tunnel=true, multiplex=true, sshflags=sshflags)
72+
@test length(new_pids) == num_workers
73+
controlpath = joinpath(ssh_dir, "julia-$(ENV["USER"])@localhost:2222")
74+
@test issocket(controlpath)
75+
test_n_remove_pids(new_pids)
76+
@test :ok == timedwait(()->!issocket(controlpath), 10.0; pollint=0.5)
77+
78+
print("\nAll supported formats for hostname\n")
79+
h1 = "localhost"
80+
user = ENV["USER"]
81+
h2 = "$user@$h1"
82+
h3 = "$h2:2222"
83+
h4 = "$h3 $(string(getipaddr()))"
84+
h5 = "$h4:9300"
85+
86+
new_pids = addprocs_with_testenv([h1, h2, h3, h4, h5]; sshflags=sshflags)
87+
@test length(new_pids) == 5
88+
test_n_remove_pids(new_pids)
89+
90+
print("\nkeyword arg exename\n")
91+
for exename in [`$(joinpath(Sys.BINDIR, Base.julia_exename()))`, "$(joinpath(Sys.BINDIR, Base.julia_exename()))"]
92+
for addp_func in [()->addprocs_with_testenv(["localhost"]; exename=exename, exeflags=test_exeflags, sshflags=sshflags),
93+
()->addprocs_with_testenv(1; exename=exename, exeflags=test_exeflags)]
94+
95+
local new_pids = addp_func()
96+
@test length(new_pids) == 1
97+
test_n_remove_pids(new_pids)
98+
end
99+
end
100+
end
101+
end

0 commit comments

Comments
 (0)