2
2
3
3
4
4
class QueueEmpty (Exception ):
5
+ """Raised when Queue.get_nowait() is called on an empty Queue."""
5
6
pass
6
7
7
8
8
9
class QueueFull (Exception ):
10
+ """Raised when the Queue.put_nowait() method is called on a full Queue."""
9
11
pass
10
12
11
13
12
14
class Queue :
15
+ """
16
+ A queue, useful for coordinating producer and consumer coroutines.
17
+
18
+ If maxsize is less than or equal to zero, the queue size is infinite. If it
19
+ is an integer greater than 0, then "await put()" will block when the
20
+ queue reaches maxsize, until an item is removed by get().
21
+
22
+ Unlike CPython's asyncio.Queue, this implementation is backed by a list rather
23
+ than `collections.deque` because smaller boards may not have the library
24
+ implemented.
25
+ """
26
+
13
27
def __init__ (self , maxsize = 0 ):
14
28
self .maxsize = maxsize
15
- self ._queue = list ()
29
+
30
+ self ._queue = []
16
31
17
32
self ._join_counter = 0
18
33
self ._join_event = event .Event ()
@@ -37,41 +52,86 @@ def _put(self, val):
37
52
self ._put_event .clear ()
38
53
39
54
async def get (self ):
55
+ """
56
+ Remove and return an item from the queue.
57
+
58
+ If queue is empty, wait until an item is available.
59
+ """
40
60
while self .empty ():
41
61
await self ._put_event .wait ()
42
62
return self ._get ()
43
63
44
64
def get_nowait (self ):
65
+ """
66
+ Remove and return an item from the queue.
67
+
68
+ If queue is empty, raise QueueEmpty.
69
+ """
45
70
if self .empty ():
46
71
raise QueueEmpty ()
47
72
return self ._get ()
48
73
49
74
async def put (self , val ):
75
+ """
76
+ Put an item into the queue.
77
+
78
+ If the queue is full, waits until a free
79
+ slot is available before adding item.
80
+ """
50
81
while self .full ():
51
82
await self ._get_event .wait ()
52
83
self ._put (val )
53
84
54
85
def put_nowait (self , val ):
86
+ """
87
+ Put an item into the queue.
88
+
89
+ If the queue is full, raises QueueFull.
90
+ """
55
91
if self .full ():
56
92
raise QueueFull ()
57
93
self ._put (val )
58
94
59
95
def qsize (self ):
96
+ """
97
+ Number of items in this queue.
98
+ """
60
99
return len (self ._queue )
61
100
62
101
def empty (self ):
102
+ """
103
+ Return True if the queue is empty.
104
+ """
63
105
return len (self ._queue ) == 0
64
106
65
107
def full (self ):
108
+ """
109
+ Return True if there are maxsize items in the queue.
110
+ """
66
111
return 0 < self .maxsize <= self .qsize ()
67
112
68
113
def task_done (self ):
69
- self ._join_counter -= 1
114
+ """
115
+ Indicate that a formerly enqueued task is complete.
116
+
117
+ If a join() is currently blocking, it will resume when all items have
118
+ been processed (meaning that a task_done() call was received for every
119
+ item that had been put() into the queue).
70
120
71
- if self ._join_counter <= 0 :
121
+ Raises ValueError if called more times than there were items placed in
122
+ the queue.
123
+ """
124
+ if self ._join_counter == 0 :
72
125
# Can't have less than 0
73
- self ._join_counter = 0
126
+ raise ValueError ("task_done() called too many times" )
127
+
128
+ self ._join_counter -= 1
129
+
130
+ if self ._join_counter == 0 :
74
131
self ._join_event .set ()
75
132
76
133
async def join (self ):
134
+ """
135
+ Block until all items in the queue have been gotten and processed.
136
+ """
77
137
await self ._join_event .wait ()
0 commit comments