Skip to content

Commit b150514

Browse files
authored
Allow queue to be used for task finished. (#276)
1 parent 3b2ccdd commit b150514

File tree

7 files changed

+169
-116
lines changed

7 files changed

+169
-116
lines changed

Diff for: fixtures/a_condition.rb

-99
This file was deleted.

Diff for: fixtures/async/a_condition.rb

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2018-2022, by Samuel Williams.
5+
6+
require 'async/variable'
7+
8+
module Async
9+
ACondition = Sus::Shared("a condition") do
10+
let(:condition) {subject.new}
11+
12+
it 'can signal waiting task' do
13+
state = nil
14+
15+
reactor.async do
16+
state = :waiting
17+
condition.wait
18+
state = :resumed
19+
end
20+
21+
expect(state).to be == :waiting
22+
23+
condition.signal
24+
25+
reactor.yield
26+
27+
expect(state).to be == :resumed
28+
end
29+
30+
it 'should be able to signal stopped task' do
31+
expect(condition).to be(:empty?)
32+
expect(condition).not.to be(:waiting?)
33+
34+
task = reactor.async do
35+
condition.wait
36+
end
37+
38+
expect(condition).not.to be(:empty?)
39+
expect(condition).to be(:waiting?)
40+
41+
task.stop
42+
43+
condition.signal
44+
end
45+
46+
it 'resumes tasks in order' do
47+
order = []
48+
49+
5.times do |i|
50+
task = reactor.async do
51+
condition.wait
52+
order << i
53+
end
54+
end
55+
56+
condition.signal
57+
58+
reactor.yield
59+
60+
expect(order).to be == [0, 1, 2, 3, 4]
61+
end
62+
63+
with "timeout" do
64+
let(:ready) {Async::Variable.new(condition)}
65+
let(:waiting) {Async::Variable.new(subject.new)}
66+
67+
def before
68+
@state = nil
69+
70+
@task = reactor.async do |task|
71+
task.with_timeout(0.01) do
72+
begin
73+
@state = :waiting
74+
waiting.resolve
75+
76+
ready.wait
77+
@state = :signalled
78+
rescue Async::TimeoutError
79+
@state = :timeout
80+
end
81+
end
82+
end
83+
84+
super
85+
end
86+
87+
it 'can timeout while waiting' do
88+
@task.wait
89+
90+
expect(@state).to be == :timeout
91+
end
92+
93+
it 'can signal while waiting' do
94+
waiting.wait
95+
ready.resolve
96+
97+
@task.wait
98+
99+
expect(@state).to be == :signalled
100+
end
101+
end
102+
end
103+
end

Diff for: lib/async/condition.rb

+6-2
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,16 @@ def wait
4040
end
4141
end
4242

43-
# Is any fiber waiting on this notification?
44-
# @returns [Boolean]
43+
# @deprecated Replaced by {#waiting?}
4544
def empty?
4645
@waiting.empty?
4746
end
4847

48+
# @returns [Boolean] Is any fiber waiting on this notification?
49+
def waiting?
50+
@waiting.size > 0
51+
end
52+
4953
# Signal to a given task that it should resume operations.
5054
# @parameter value [Object | Nil] The value to return to the waiting fibers.
5155
def signal(value = nil)

Diff for: lib/async/queue.rb

+24-11
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,19 @@
99

1010
module Async
1111
# A queue which allows items to be processed in order.
12+
#
13+
# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
14+
#
1215
# @public Since `stable-v1`.
13-
class Queue < Notification
16+
class Queue
1417
# Create a new queue.
1518
#
1619
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
17-
def initialize(parent: nil)
18-
super()
19-
20+
# @parameter available [Notification] The notification to use for signaling when items are available.
21+
def initialize(parent: nil, available: Notification.new)
2022
@items = []
2123
@parent = parent
24+
@available = available
2225
end
2326

2427
# @attribute [Array] The items in the queue.
@@ -38,20 +41,20 @@ def empty?
3841
def <<(item)
3942
@items << item
4043

41-
self.signal unless self.empty?
44+
@available.signal unless self.empty?
4245
end
4346

4447
# Add multiple items to the queue.
4548
def enqueue(*items)
4649
@items.concat(items)
4750

48-
self.signal unless self.empty?
51+
@available.signal unless self.empty?
4952
end
5053

5154
# Remove and return the next item from the queue.
5255
def dequeue
5356
while @items.empty?
54-
self.wait
57+
@available.wait
5558
end
5659

5760
@items.shift
@@ -77,6 +80,16 @@ def each
7780
yield item
7881
end
7982
end
83+
84+
# Signal the queue with a value, the same as {#enqueue}.
85+
def signal(value)
86+
self.enqueue(value)
87+
end
88+
89+
# Wait for an item to be available, the same as {#dequeue}.
90+
def wait
91+
self.dequeue
92+
end
8093
end
8194

8295
# A queue which limits the number of items that can be enqueued.
@@ -85,12 +98,12 @@ class LimitedQueue < Queue
8598
# Create a new limited queue.
8699
#
87100
# @parameter limit [Integer] The maximum number of items that can be enqueued.
88-
def initialize(limit = 1, **options)
101+
# @parameter full [Notification] The notification to use for signaling when the queue is full.
102+
def initialize(limit = 1, full: Notification.new, **options)
89103
super(**options)
90104

91105
@limit = limit
92-
93-
@full = Notification.new
106+
@full = full
94107
end
95108

96109
# @attribute [Integer] The maximum number of items that can be enqueued.
@@ -128,7 +141,7 @@ def enqueue(*items)
128141
available = @limit - @items.size
129142
@items.concat(items.shift(available))
130143

131-
self.signal unless self.empty?
144+
@available.signal unless self.empty?
132145
end
133146
end
134147

Diff for: test/async/condition.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
require 'sus/fixtures/async'
88
require 'async/condition'
99

10-
require 'a_condition'
10+
require 'async/a_condition'
1111

1212
describe Async::Condition do
1313
include Sus::Fixtures::Async::ReactorContext
@@ -52,5 +52,5 @@
5252
expect(consumer.status).to be == :completed
5353
end
5454

55-
it_behaves_like ACondition
55+
it_behaves_like Async::ACondition
5656
end

Diff for: test/async/notification.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
require 'sus/fixtures/async'
77
require 'async/notification'
88

9-
require 'a_condition'
9+
require 'async/a_condition'
1010

1111
describe Async::Notification do
1212
include Sus::Fixtures::Async::ReactorContext
@@ -47,5 +47,5 @@
4747
]
4848
end
4949

50-
it_behaves_like ACondition
50+
it_behaves_like Async::ACondition
5151
end

Diff for: test/async/queue.rb

+32
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,44 @@
8787
end
8888
end
8989

90+
with '#signal' do
91+
it 'can signal with an item' do
92+
queue.signal(:item)
93+
expect(queue.dequeue).to be == :item
94+
end
95+
end
96+
97+
with '#wait' do
98+
it 'can wait for an item' do
99+
reactor.async do |task|
100+
queue.enqueue(:item)
101+
end
102+
103+
expect(queue.wait).to be == :item
104+
end
105+
end
106+
90107
with 'an empty queue' do
91108
it "is expected to be empty" do
92109
expect(queue).to be(:empty?)
93110
end
94111
end
95112

113+
with 'task finishing queue' do
114+
it 'can signal task completion' do
115+
3.times do
116+
Async(finished: queue) do
117+
:result
118+
end
119+
end
120+
121+
3.times do
122+
task = queue.dequeue
123+
expect(task.wait).to be == :result
124+
end
125+
end
126+
end
127+
96128
with 'semaphore' do
97129
let(:capacity) {2}
98130
let(:semaphore) {Async::Semaphore.new(capacity)}

0 commit comments

Comments
 (0)