8
8
Exceptions and classes related to asyncio Queue implementations.
9
9
"""
10
10
11
- from . import event
11
+ from . import core
12
12
13
13
14
14
class QueueEmpty (Exception ):
@@ -19,6 +19,19 @@ class QueueFull(Exception):
19
19
"""Raised when the Queue.put_nowait() method is called on a full Queue."""
20
20
21
21
22
+ async def _wait_on_task_queue (task_queue : core .TaskQueue ):
23
+ task_queue .push (core .cur_task )
24
+ # Set calling task's data to the TaskQueue so it can be removed if needed
25
+ core .cur_task .data = task_queue
26
+ # Send control back
27
+ await core ._never ()
28
+
29
+
30
+ def _release_task_queue (task_queue : core .TaskQueue ):
31
+ while task_queue .peek ():
32
+ core ._task_queue .push (task_queue .pop ())
33
+
34
+
22
35
class Queue :
23
36
"""
24
37
A queue, useful for coordinating producer and consumer coroutines.
@@ -37,27 +50,21 @@ def __init__(self, maxsize=0):
37
50
38
51
self ._queue = []
39
52
40
- self ._join_counter = 0
41
- self ._join_event = event .Event ()
42
- self ._join_event .set ()
53
+ self ._active_tasks = 0
43
54
44
- self ._put_event = event .Event ()
45
- self ._get_event = event .Event ()
55
+ self ._waiting_for_completion = core .TaskQueue ()
56
+ self ._waiting_for_put = core .TaskQueue ()
57
+ self ._waiting_for_get = core .TaskQueue ()
46
58
47
59
def _get (self ):
48
60
value = self ._queue .pop (0 )
49
- self ._get_event .set ()
50
- self ._get_event .clear ()
61
+ _release_task_queue (self ._waiting_for_get )
51
62
return value
52
63
53
64
def _put (self , val ):
54
- self ._join_counter += 1
55
- self ._join_event .clear ()
56
-
57
65
self ._queue .append (val )
58
-
59
- self ._put_event .set ()
60
- self ._put_event .clear ()
66
+ self ._active_tasks += 1
67
+ _release_task_queue (self ._waiting_for_put )
61
68
62
69
async def get (self ):
63
70
"""
@@ -66,7 +73,7 @@ async def get(self):
66
73
If queue is empty, wait until an item is available.
67
74
"""
68
75
while self .empty ():
69
- await self ._put_event . wait ( )
76
+ await _wait_on_task_queue ( self ._waiting_for_put )
70
77
return self ._get ()
71
78
72
79
def get_nowait (self ):
@@ -87,7 +94,7 @@ async def put(self, val):
87
94
slot is available before adding item.
88
95
"""
89
96
while self .full ():
90
- await self ._get_event . wait ( )
97
+ await _wait_on_task_queue ( self ._waiting_for_get )
91
98
self ._put (val )
92
99
93
100
def put_nowait (self , val ):
@@ -129,17 +136,17 @@ def task_done(self):
129
136
Raises ValueError if called more times than there were items placed in
130
137
the queue.
131
138
"""
132
- if self ._join_counter == 0 :
139
+ if self ._active_tasks == 0 :
133
140
# Can't have less than 0
134
141
raise ValueError ("task_done() called too many times" )
135
142
136
- self ._join_counter -= 1
143
+ self ._active_tasks -= 1
137
144
138
- if self ._join_counter == 0 :
139
- self ._join_event . set ( )
145
+ if self ._active_tasks == 0 :
146
+ _release_task_queue ( self ._waiting_for_completion )
140
147
141
148
async def join (self ):
142
149
"""
143
150
Block until all items in the queue have been gotten and processed.
144
151
"""
145
- await self ._join_event . wait ( )
152
+ await _wait_on_task_queue ( self ._waiting_for_completion )
0 commit comments