Skip to content

Commit 71a6110

Browse files
authored
Support minsize setting; don't spawn tasks when only 1 chunk (#145)
* Support `minsize` setting; don't spawn tasks when only 1 chunk * fix multiple chunks detection * handle `minsize=nothing` case * typo * don't rely on upstream changes to ChunkSplitters * make downgrade CI happy * rename `minsize` -> `minchunksize` * print nicer * don't show minimum chunk size for FixedSize schedulers
1 parent a2c1f83 commit 71a6110

File tree

5 files changed

+107
-55
lines changed

5 files changed

+107
-55
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
OhMyThreads.jl Changelog
22
=========================
33

4+
Version 0.8.2
5+
------------
6+
- ![Feature][badge-feature] Added a `minchunksize` chunking argument for schedulers, so that they can specify a lower bound on the size of chunks which are worth parallelizing. For example, `treduce(+, 1:10; minchunksize=100)` will run serially, but `treduce(+, 1:1000000; minchunksize=100)` will be parallelized.
7+
- ![Enhancement][badge-enhancement] Operations on collections with only one 'chunk' no longer spawn an unnecessary task. That means operations like `treduce(+, 1:10; minchunksize=100)` will have less overhead.
8+
49
Version 0.8.1
510
------------
611
- ![Feature][badge-feature] Added a `@localize` macro which turns `@localize x y expr` into `let x=x, y=y; expr end` ([#142][gh-pr-142])

Project.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ MarkdownExt = "Markdown"
1919
[compat]
2020
Aqua = "0.8"
2121
BangBang = "0.3.40, 0.4"
22-
ChunkSplitters = "3"
22+
ChunkSplitters = "3.1"
2323
Markdown = "1"
2424
ScopedValues = "1.3"
2525
StableTasks = "0.1.5"

src/implementation.jl

+26-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ using OhMyThreads: Scheduler,
77
DynamicScheduler, StaticScheduler, GreedyScheduler,
88
SerialScheduler
99
using OhMyThreads.Schedulers: chunking_enabled,
10-
nchunks, chunksize, chunksplit, has_chunksplit,
10+
nchunks, chunksize, chunksplit, minchunksize, has_chunksplit,
1111
chunking_mode, ChunkingMode, NoChunking,
1212
FixedSize, FixedCount, scheduler_from_symbol, NotGiven,
1313
isgiven
@@ -25,14 +25,17 @@ function _index_chunks(sched, arg)
2525
C = chunking_mode(sched)
2626
@assert C != NoChunking
2727
if C == FixedCount
28+
msz = isnothing(minchunksize(sched)) ? nothing : min(minchunksize(sched), length(arg))
2829
index_chunks(arg;
2930
n = nchunks(sched),
30-
split = chunksplit(sched))::IndexChunks{
31+
split = chunksplit(sched),
32+
minsize = msz)::IndexChunks{
3133
typeof(arg), ChunkSplitters.Internals.FixedCount}
3234
elseif C == FixedSize
3335
index_chunks(arg;
3436
size = chunksize(sched),
35-
split = chunksplit(sched))::IndexChunks{
37+
split = chunksplit(sched),
38+
minsize = minchunksize(sched))::IndexChunks{
3639
typeof(arg), ChunkSplitters.Internals.FixedSize}
3740
end
3841
end
@@ -67,6 +70,23 @@ function _check_chunks_incompatible_kwargs(; kwargs...)
6770
return nothing
6871
end
6972

73+
function has_multiple_chunks(scheduler, coll)
74+
C = chunking_mode(scheduler)
75+
if C == NoChunking || coll isa Union{AbstractChunks, ChunkSplitters.Internals.Enumerate}
76+
length(coll) > 1
77+
elseif C == FixedCount
78+
if isnothing(minchunksize(scheduler))
79+
mcs = 1
80+
else
81+
mcs = max(min(minchunksize(scheduler), length(coll)), 1)
82+
end
83+
min(length(coll) ÷ mcs, nchunks(scheduler)) > 1
84+
elseif C == FixedSize
85+
length(coll) ÷ chunksize(scheduler) > 1
86+
end
87+
end
88+
89+
7090
function tmapreduce(f, op, Arrs...;
7191
scheduler::MaybeScheduler = NotGiven(),
7292
outputtype::Type = Any,
@@ -79,7 +99,7 @@ function tmapreduce(f, op, Arrs...;
7999
if A isa AbstractChunks || A isa ChunkSplitters.Internals.Enumerate
80100
_check_chunks_incompatible_kwargs(; kwargs...)
81101
end
82-
if _scheduler isa SerialScheduler || isempty(first(Arrs))
102+
if _scheduler isa SerialScheduler || !has_multiple_chunks(_scheduler, first(Arrs))
83103
# empty input collection → align with Base.mapreduce behavior
84104
mapreduce(f, op, Arrs...; mapreduce_kwargs...)
85105
else
@@ -107,11 +127,12 @@ function _tmapreduce(f,
107127
throw_if_boxed_captures(f, op)
108128
if chunking_enabled(scheduler)
109129
tasks = map(_index_chunks(scheduler, first(Arrs))) do inds
130+
110131
args = map(A -> view(A, inds), Arrs)
111132
# Note, calling `promise_task_local` here is only safe because we're assuming that
112133
# Base.mapreduce isn't going to magically try to do multithreading on us...
113134
@spawn threadpool mapreduce(promise_task_local(f), promise_task_local(op),
114-
args...; $mapreduce_kwargs...)
135+
args...; $mapreduce_kwargs...)
115136
end
116137
mapreduce(fetch, promise_task_local(op), tasks)
117138
else

src/schedulers.jl

+29-10
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,15 @@ struct ChunkingArgs{C, S <: Split}
8585
n::Int
8686
size::Int
8787
split::S
88+
minsize::Union{Int, Nothing}
8889
end
89-
ChunkingArgs(::Type{NoChunking}) = ChunkingArgs{NoChunking, NoSplit}(-1, -1, NoSplit())
90+
ChunkingArgs(::Type{NoChunking}) = ChunkingArgs{NoChunking, NoSplit}(-1, -1, NoSplit(), nothing)
9091
function ChunkingArgs(
9192
Sched::Type{<:Scheduler},
9293
n::MaybeInteger,
9394
size::MaybeInteger,
9495
split::Union{Symbol, Split};
96+
minsize=nothing,
9597
chunking
9698
)
9799
chunking || return ChunkingArgs(NoChunking)
@@ -106,7 +108,7 @@ function ChunkingArgs(
106108

107109
chunking_mode = size > 0 ? FixedSize : FixedCount
108110
split = _parse_split(split)
109-
result = ChunkingArgs{chunking_mode, typeof(split)}(n, size, split)
111+
result = ChunkingArgs{chunking_mode, typeof(split)}(n, size, split, minsize)
110112

111113
# argument names in error messages are those of the scheduler constructor instead
112114
# of ChunkingArgs because the user should not be aware of the ChunkingArgs type
@@ -124,14 +126,20 @@ chunking_mode(::ChunkingArgs{C}) where {C} = C
124126
has_n(ca::ChunkingArgs) = ca.n > 0
125127
has_size(ca::ChunkingArgs) = ca.size > 0
126128
has_split(::ChunkingArgs{C, S}) where {C, S} = S !== NoSplit
129+
has_minsize(ca::ChunkingArgs) = !isnothing(ca.minsize)
127130
chunking_enabled(ca::ChunkingArgs) = chunking_mode(ca) != NoChunking
128131

129132
_chunkingstr(ca::ChunkingArgs{NoChunking}) = "none"
130133
function _chunkingstr(ca::ChunkingArgs{FixedCount})
131-
return "fixed count ($(ca.n)), split :$(_splitid(ca.split))"
134+
str = "fixed count ($(ca.n)), split :$(_splitid(ca.split))"
135+
if has_minsize(ca)
136+
str = str * ", minimum chunk size $(ca.minsize)"
137+
end
138+
str
132139
end
133140
function _chunkingstr(ca::ChunkingArgs{FixedSize})
134-
return "fixed size ($(ca.size)), split :$(_splitid(ca.split))"
141+
str = "fixed size ($(ca.size)), split :$(_splitid(ca.split))"
142+
str
135143
end
136144

137145
# Link between a scheduler and its chunking arguments
@@ -142,10 +150,12 @@ chunking_args(::Scheduler) = ChunkingArgs(NoChunking)
142150
nchunks(sched::Scheduler) = chunking_args(sched).n
143151
chunksize(sched::Scheduler) = chunking_args(sched).size
144152
chunksplit(sched::Scheduler) = chunking_args(sched).split
153+
minchunksize(sched::Scheduler) = chunking_args(sched).minsize
145154

146155
has_nchunks(sched::Scheduler) = has_n(chunking_args(sched))
147156
has_chunksize(sched::Scheduler) = has_size(chunking_args(sched))
148157
has_chunksplit(sched::Scheduler) = has_split(chunking_args(sched))
158+
has_minchunksize(sched::Scheduler) = has_minsize(chunking_args(sched))
149159

150160
chunking_mode(sched::Scheduler) = chunking_mode(chunking_args(sched))
151161
chunking_enabled(sched::Scheduler) = chunking_enabled(chunking_args(sched))
@@ -180,6 +190,8 @@ with other multithreaded code.
180190
- `chunksize::Integer` (default not set)
181191
* Specifies the desired chunk size (instead of the number of chunks).
182192
* The options `chunksize` and `nchunks`/`ntasks` are **mutually exclusive** (only one may be a positive integer).
193+
- `minchunksize::Union{Integer, Nothing}` (default `nothing`)
194+
* Sets a lower bound on the size of chunks. This argument takes priority over `nchunks`, so `treduce(+, 1:10; nchunks=10, minchunksize=5)` will only operate on `2` chunks for example.
183195
- `split::Union{Symbol, OhMyThreads.Split}` (default `OhMyThreads.Consecutive()`):
184196
* Determines how the collection is divided into chunks (if chunking=true). By default, each chunk consists of contiguous elements and order is maintained.
185197
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. We also allow users to pass `:consecutive` in place of `Consecutive()`, and `:roundrobin` in place of `RoundRobin()`
@@ -209,14 +221,15 @@ function DynamicScheduler(;
209221
ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks
210222
chunksize::MaybeInteger = NotGiven(),
211223
chunking::Bool = true,
212-
split::Union{Split, Symbol} = Consecutive())
224+
split::Union{Split, Symbol} = Consecutive(),
225+
minchunksize::Union{Nothing, Int}=nothing)
213226
if isgiven(ntasks)
214227
if isgiven(nchunks)
215228
throw(ArgumentError("For the dynamic scheduler, nchunks and ntasks are aliases and only one may be provided"))
216229
end
217230
nchunks = ntasks
218231
end
219-
ca = ChunkingArgs(DynamicScheduler, nchunks, chunksize, split; chunking)
232+
ca = ChunkingArgs(DynamicScheduler, nchunks, chunksize, split; chunking, minsize=minchunksize)
220233
return DynamicScheduler(threadpool, ca)
221234
end
222235
from_symbol(::Val{:dynamic}) = DynamicScheduler
@@ -250,6 +263,8 @@ Isn't well composable with other multithreaded code though.
250263
- `chunksize::Integer` (default not set)
251264
* Specifies the desired chunk size (instead of the number of chunks).
252265
* The options `chunksize` and `nchunks`/`ntasks` are **mutually exclusive** (only one may be non-zero).
266+
- `minchunksize::Union{Integer, Nothing}` (default `nothing`)
267+
* Sets a lower bound on the size of chunks. This argument takes priority over `nchunks`, so `treduce(+, 1:10; nchunks=10, minchunksize=5)` will only operate on `2` chunks for example.
253268
- `chunking::Bool` (default `true`):
254269
* Controls whether input elements are grouped into chunks (`true`) or not (`false`).
255270
* For `chunking=false`, the arguments `nchunks`/`ntasks`, `chunksize`, and `split` are ignored and input elements are regarded as "chunks" as is. Hence, there will be one parallel task spawned per input element. Note that, depending on the input, this **might spawn many(!) tasks** and can be costly!
@@ -267,14 +282,15 @@ function StaticScheduler(;
267282
ntasks::MaybeInteger = NotGiven(), # "alias" for nchunks
268283
chunksize::MaybeInteger = NotGiven(),
269284
chunking::Bool = true,
270-
split::Union{Split, Symbol} = Consecutive())
285+
split::Union{Split, Symbol} = Consecutive(),
286+
minchunksize::Union{Nothing, Int} = nothing)
271287
if isgiven(ntasks)
272288
if isgiven(nchunks)
273289
throw(ArgumentError("For the static scheduler, nchunks and ntasks are aliases and only one may be provided"))
274290
end
275291
nchunks = ntasks
276292
end
277-
ca = ChunkingArgs(StaticScheduler, nchunks, chunksize, split; chunking)
293+
ca = ChunkingArgs(StaticScheduler, nchunks, chunksize, split; chunking, minsize=minchunksize)
278294
return StaticScheduler(ca)
279295
end
280296
from_symbol(::Val{:static}) = StaticScheduler
@@ -315,6 +331,8 @@ some additional overhead.
315331
- `chunksize::Integer` (default not set)
316332
* Specifies the desired chunk size (instead of the number of chunks).
317333
* The options `chunksize` and `nchunks` are **mutually exclusive** (only one may be a positive integer).
334+
- `minchunksize::Union{Integer, Nothing}` (default `nothing`)
335+
* Sets a lower bound on the size of chunks. This argument takes priority over `nchunks`, so `treduce(+, 1:10; nchunks=10, minchunksize=5)` will only operate on `2` chunks for example.
318336
- `split::Union{Symbol, OhMyThreads.Split}` (default `OhMyThreads.RoundRobin()`):
319337
* Determines how the collection is divided into chunks (if chunking=true).
320338
* See [ChunkSplitters.jl](https://github.com/JuliaFolds2/ChunkSplitters.jl) for more details and available options. We also allow users to pass `:consecutive` in place of `Consecutive()`, and `:roundrobin` in place of `RoundRobin()`
@@ -334,11 +352,12 @@ function GreedyScheduler(;
334352
nchunks::MaybeInteger = NotGiven(),
335353
chunksize::MaybeInteger = NotGiven(),
336354
chunking::Bool = false,
337-
split::Union{Split, Symbol} = RoundRobin())
355+
split::Union{Split, Symbol} = RoundRobin(),
356+
minchunksize::Union{Nothing, Int} = nothing)
338357
if isgiven(nchunks) || isgiven(chunksize)
339358
chunking = true
340359
end
341-
ca = ChunkingArgs(GreedyScheduler, nchunks, chunksize, split; chunking)
360+
ca = ChunkingArgs(GreedyScheduler, nchunks, chunksize, split; chunking, minsize=minchunksize)
342361
return GreedyScheduler(ntasks, ca)
343362
end
344363
from_symbol(::Val{:greedy}) = GreedyScheduler

test/runtests.jl

+46-39
Original file line numberDiff line numberDiff line change
@@ -28,45 +28,46 @@ ChunkedGreedy(; kwargs...) = GreedyScheduler(; kwargs...)
2828
SerialScheduler, ChunkedGreedy)
2929
@testset for split in (Consecutive(), RoundRobin(), :consecutive, :roundrobin)
3030
for nchunks in (1, 2, 6)
31-
if sched == GreedyScheduler
32-
scheduler = sched(; ntasks = nchunks)
33-
elseif sched == DynamicScheduler{OhMyThreads.Schedulers.NoChunking}
34-
scheduler = DynamicScheduler(; chunking = false)
35-
elseif sched == SerialScheduler
36-
scheduler = SerialScheduler()
37-
else
38-
scheduler = sched(; nchunks, split)
31+
for minchunksize (nothing, 1, 3)
32+
if sched == GreedyScheduler
33+
scheduler = sched(; ntasks = nchunks, minchunksize)
34+
elseif sched == DynamicScheduler{OhMyThreads.Schedulers.NoChunking}
35+
scheduler = DynamicScheduler(; chunking = false)
36+
elseif sched == SerialScheduler
37+
scheduler = SerialScheduler()
38+
else
39+
scheduler = sched(; nchunks, split, minchunksize)
40+
end
41+
kwargs = (; scheduler)
42+
if (split in (RoundRobin(), :roundrobin) ||
43+
sched (GreedyScheduler, ChunkedGreedy)) || op (vcat, *)
44+
# scatter and greedy only works for commutative operators!
45+
else
46+
mapreduce_f_op_itr = mapreduce(f, op, itrs...)
47+
@test tmapreduce(f, op, itrs...; init, kwargs...) ~ mapreduce_f_op_itr
48+
@test treducemap(op, f, itrs...; init, kwargs...) ~ mapreduce_f_op_itr
49+
@test treduce(op, f.(itrs...); init, kwargs...) ~ mapreduce_f_op_itr
50+
end
51+
52+
split in (RoundRobin(), :roundrobin) && continue
53+
map_f_itr = map(f, itrs...)
54+
@test all(tmap(f, Any, itrs...; kwargs...) .~ map_f_itr)
55+
@test all(tcollect(Any, (f(x...) for x in collect(zip(itrs...))); kwargs...) .~ map_f_itr)
56+
@test all(tcollect(Any, f.(itrs...); kwargs...) .~ map_f_itr)
57+
58+
RT = Core.Compiler.return_type(f, Tuple{eltype.(itrs)...})
59+
60+
@test tmap(f, RT, itrs...; kwargs...) ~ map_f_itr
61+
@test tcollect(RT, (f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr
62+
@test tcollect(RT, f.(itrs...); kwargs...) ~ map_f_itr
63+
64+
if sched (GreedyScheduler, ChunkedGreedy)
65+
@test tmap(f, itrs...; kwargs...) ~ map_f_itr
66+
@test tcollect((f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr
67+
@test tcollect(f.(itrs...); kwargs...) ~ map_f_itr
68+
end
3969
end
40-
41-
kwargs = (; scheduler)
42-
if (split in (RoundRobin(), :roundrobin) ||
43-
sched (GreedyScheduler, ChunkedGreedy)) || op (vcat, *)
44-
# scatter and greedy only works for commutative operators!
45-
else
46-
mapreduce_f_op_itr = mapreduce(f, op, itrs...)
47-
@test tmapreduce(f, op, itrs...; init, kwargs...) ~ mapreduce_f_op_itr
48-
@test treducemap(op, f, itrs...; init, kwargs...) ~ mapreduce_f_op_itr
49-
@test treduce(op, f.(itrs...); init, kwargs...) ~ mapreduce_f_op_itr
50-
end
51-
52-
split in (RoundRobin(), :roundrobin) && continue
53-
map_f_itr = map(f, itrs...)
54-
@test all(tmap(f, Any, itrs...; kwargs...) .~ map_f_itr)
55-
@test all(tcollect(Any, (f(x...) for x in collect(zip(itrs...))); kwargs...) .~ map_f_itr)
56-
@test all(tcollect(Any, f.(itrs...); kwargs...) .~ map_f_itr)
57-
58-
RT = Core.Compiler.return_type(f, Tuple{eltype.(itrs)...})
59-
60-
@test tmap(f, RT, itrs...; kwargs...) ~ map_f_itr
61-
@test tcollect(RT, (f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr
62-
@test tcollect(RT, f.(itrs...); kwargs...) ~ map_f_itr
63-
64-
if sched (GreedyScheduler, ChunkedGreedy)
65-
@test tmap(f, itrs...; kwargs...) ~ map_f_itr
66-
@test tcollect((f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr
67-
@test tcollect(f.(itrs...); kwargs...) ~ map_f_itr
68-
end
69-
end
70+
end
7071
end
7172
end
7273
end
@@ -185,6 +186,10 @@ end;
185186
@set chunking = false
186187
i
187188
end) |> isnothing
189+
@test @tasks(for i in 1:4
190+
@set minchunksize=2
191+
i
192+
end) |> isnothing
188193
@test_throws ArgumentError @tasks(for i in 1:3
189194
@set scheduler = DynamicScheduler()
190195
@set chunking = false
@@ -386,7 +391,9 @@ end;
386391
@test tmapreduce(sin, +, 1:10000; split = RoundRobin()) res_tmr
387392
@test tmapreduce(sin, +, 1:10000; chunksize = 2) res_tmr
388393
@test tmapreduce(sin, +, 1:10000; chunking = false) res_tmr
389-
394+
@test tmapreduce(sin, +, 1:10000; minchunksize=10) res_tmr
395+
@test tmapreduce(sin, +, 1:10; minchunksize=10) == mapreduce(sin, +, 1:10)
396+
390397
# scheduler isa Scheduler
391398
@test tmapreduce(sin, +, 1:10000; scheduler = StaticScheduler()) res_tmr
392399
@test_throws ArgumentError tmapreduce(

0 commit comments

Comments
 (0)