Skip to content

Commit 3c7875d

Browse files
committed
Use .yield instead of async { ... }.wait to give tasks a chance to run
1 parent e60d22b commit 3c7875d

File tree

2 files changed

+31
-26
lines changed

2 files changed

+31
-26
lines changed

lib/graphql/dataloader/async_dataloader.rb

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,27 @@ def run
1919
while first_pass || job_tasks.any?
2020
first_pass = false
2121

22-
root_task.async do |jobs_task|
23-
while (task = job_tasks.shift || spawn_job_task(jobs_task, jobs_condition))
24-
if task.alive?
25-
next_job_tasks << task
26-
elsif task.failed?
27-
# re-raise a raised error -
28-
# this also covers errors from sources since
29-
# these jobs wait for sources as needed.
30-
task.wait
31-
end
22+
while (task = job_tasks.shift || spawn_job_task(root_task, jobs_condition))
23+
if task.alive?
24+
next_job_tasks << task
25+
elsif task.failed?
26+
# re-raise a raised error -
27+
# this also covers errors from sources since
28+
# these jobs wait for sources as needed.
29+
task.wait
3230
end
33-
end.wait
31+
end
32+
root_task.yield # give job tasks a chance to run
3433
job_tasks.concat(next_job_tasks)
3534
next_job_tasks.clear
3635

3736
while source_tasks.any? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
38-
root_task.async do |sources_loop_task|
39-
while (task = source_tasks.shift || spawn_source_task(sources_loop_task, sources_condition))
40-
if task.alive?
41-
next_source_tasks << task
42-
end
37+
while (task = source_tasks.shift || spawn_source_task(root_task, sources_condition))
38+
if task.alive?
39+
next_source_tasks << task
4340
end
44-
end.wait
41+
end
42+
root_task.yield # give source tasks a chance to run
4543
sources_condition.signal
4644
source_tasks.concat(next_source_tasks)
4745
next_source_tasks.clear
@@ -58,7 +56,7 @@ def run
5856
def spawn_job_task(parent_task, condition)
5957
if @pending_jobs.any?
6058
fiber_vars = get_fiber_variables
61-
parent_task.async do |t|
59+
parent_task.async do
6260
set_fiber_variables(fiber_vars)
6361
Thread.current[:graphql_dataloader_next_tick] = condition
6462
while job = @pending_jobs.shift

spec/graphql/dataloader_spec.rb

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22
require "spec_helper"
33
require "fiber"
44

5+
if defined?(Console) && defined?(Async)
6+
Console.logger.disable(Async::Task)
7+
end
8+
59
describe GraphQL::Dataloader do
610
class BatchedCallsCounter
711
def initialize
@@ -1005,27 +1009,30 @@ def self.included(child_class)
10051009

10061010
it "works with very very large queries" do
10071011
query_str = "{".dup
1008-
1100.times do |i|
1012+
fields = 1100
1013+
fields.times do |i|
10091014
query_str << "\n field#{i}: lookaheadIngredient(input: { id: 1, batchKey: \"key-#{i}\"}) { name }"
10101015
end
10111016
query_str << "\n}"
10121017
GC.start
1018+
GC.disable
10131019
res = schema.execute(query_str)
1014-
assert_equal 1100, res["data"].keys.size
1020+
assert_equal fields, res["data"].keys.size
10151021
all_fibers = []
10161022
ObjectSpace.each_object(Fiber) do |f|
10171023
all_fibers << f
10181024
end
10191025
all_fibers.delete(Fiber.current)
1020-
if schema.dataloader_class == GraphQL::Dataloader::AsyncDataloader
1021-
skip <<~ERR
1022-
TODO: AsyncDataloader leaves orphan suspended fibers :'(
1026+
if all_fibers.any?(&:alive?)
1027+
puts <<~ERR
1028+
Alive fibers:
10231029
1024-
- #{all_fibers.select(&:alive?).join("\n -")}
1030+
- #{all_fibers.select(&:alive?).join("\n - ")}
10251031
ERR
1026-
else
1027-
assert_equal [false], all_fibers.map(&:alive?).uniq
10281032
end
1033+
assert_equal [false], all_fibers.map(&:alive?).uniq
1034+
ensure
1035+
GC.enable
10291036
end
10301037

10311038
it "doesn't perform duplicate source fetches" do

0 commit comments

Comments
 (0)