Skip to content

Commit 1bc53b3

Browse files
committed
Merge branch 'master' into beta
2 parents 7eb345f + 57a18cf commit 1bc53b3

File tree

4 files changed

+98
-35
lines changed

4 files changed

+98
-35
lines changed

Advanced.Algorithms/Distributed/AsyncQueue.cs

+23-14
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@ public class AsyncQueue<T>
1616
private readonly Queue<TaskCompletionSource<T>> consumerQueue = new Queue<TaskCompletionSource<T>>();
1717
private SemaphoreSlim consumerQueueLock = new SemaphoreSlim(1);
1818

19+
public int Count => queue.Count;
20+
1921
/// <summary>
2022
/// Supports multi-threaded producers.
2123
/// Time complexity: O(1).
2224
/// </summary>
23-
public async Task EnqueueAsync(T value, CancellationToken taskCancellationToken = default(CancellationToken))
25+
public async Task EnqueueAsync(T value, int millisecondsTimeout = int.MaxValue, CancellationToken taskCancellationToken = default(CancellationToken))
2426
{
25-
await consumerQueueLock.WaitAsync(taskCancellationToken);
27+
await consumerQueueLock.WaitAsync(millisecondsTimeout, taskCancellationToken);
2628

2729
if(consumerQueue.Count > 0)
2830
{
2931
var consumer = consumerQueue.Dequeue();
30-
consumer.SetResult(value);
32+
consumer.TrySetResult(value);
3133
}
3234
else
3335
{
@@ -41,22 +43,29 @@ public class AsyncQueue<T>
4143
/// Supports multi-threaded consumers.
4244
/// Time complexity: O(1).
4345
/// </summary>
44-
public async Task<T> DequeueAsync(CancellationToken taskCancellationToken = default(CancellationToken))
46+
public async Task<T> DequeueAsync(int millisecondsTimeout = int.MaxValue, CancellationToken taskCancellationToken = default(CancellationToken))
4547
{
46-
await consumerQueueLock.WaitAsync(taskCancellationToken);
48+
await consumerQueueLock.WaitAsync(millisecondsTimeout, taskCancellationToken);
49+
50+
TaskCompletionSource<T> consumer;
51+
52+
try
53+
{
54+
if (queue.Count > 0)
55+
{
56+
var result = queue.Dequeue();
57+
consumerQueueLock.Release();
58+
return result;
59+
}
4760

48-
if (queue.Count > 0)
61+
consumer = new TaskCompletionSource<T>();
62+
taskCancellationToken.Register(() => consumer.TrySetCanceled());
63+
consumerQueue.Enqueue(consumer);
64+
}
65+
finally
4966
{
50-
var result = queue.Dequeue();
5167
consumerQueueLock.Release();
52-
return result;
5368
}
54-
55-
var consumer = new TaskCompletionSource<T>();
56-
taskCancellationToken.Register(() => consumer.TrySetCanceled());
57-
consumerQueue.Enqueue(consumer);
58-
59-
consumerQueueLock.Release();
6069

6170
return await consumer.Task;
6271

docs/api/Advanced.Algorithms.Distributed.AsyncQueue-1.html

+41-4
Original file line numberDiff line numberDiff line change
@@ -136,19 +136,46 @@ <h5 class="typeParameters">Type Parameters</h5>
136136
</tr>
137137
</tbody>
138138
</table>
139+
<h3 id="properties">Properties
140+
</h3>
141+
142+
143+
<a id="Advanced_Algorithms_Distributed_AsyncQueue_1_Count_" data-uid="Advanced.Algorithms.Distributed.AsyncQueue`1.Count*"></a>
144+
<h4 id="Advanced_Algorithms_Distributed_AsyncQueue_1_Count" data-uid="Advanced.Algorithms.Distributed.AsyncQueue`1.Count">Count</h4>
145+
<div class="markdown level1 summary"></div>
146+
<div class="markdown level1 conceptual"></div>
147+
<h5 class="decalaration">Declaration</h5>
148+
<div class="codewrapper">
149+
<pre><code class="lang-csharp hljs">public int Count { get; }</code></pre>
150+
</div>
151+
<h5 class="propertyValue">Property Value</h5>
152+
<table class="table table-bordered table-striped table-condensed">
153+
<thead>
154+
<tr>
155+
<th>Type</th>
156+
<th>Description</th>
157+
</tr>
158+
</thead>
159+
<tbody>
160+
<tr>
161+
<td><a class="xref" href="https://docs.microsoft.com/dotnet/api/system.int32">Int32</a></td>
162+
<td></td>
163+
</tr>
164+
</tbody>
165+
</table>
139166
<h3 id="methods">Methods
140167
</h3>
141168

142169

143170
<a id="Advanced_Algorithms_Distributed_AsyncQueue_1_DequeueAsync_" data-uid="Advanced.Algorithms.Distributed.AsyncQueue`1.DequeueAsync*"></a>
144-
<h4 id="Advanced_Algorithms_Distributed_AsyncQueue_1_DequeueAsync_System_Threading_CancellationToken_" data-uid="Advanced.Algorithms.Distributed.AsyncQueue`1.DequeueAsync(System.Threading.CancellationToken)">DequeueAsync(CancellationToken)</h4>
171+
<h4 id="Advanced_Algorithms_Distributed_AsyncQueue_1_DequeueAsync_System_Int32_System_Threading_CancellationToken_" data-uid="Advanced.Algorithms.Distributed.AsyncQueue`1.DequeueAsync(System.Int32,System.Threading.CancellationToken)">DequeueAsync(Int32, CancellationToken)</h4>
145172
<div class="markdown level1 summary"><p>Supports multi-threaded consumers.
146173
Time complexity: O(1).</p>
147174
</div>
148175
<div class="markdown level1 conceptual"></div>
149176
<h5 class="decalaration">Declaration</h5>
150177
<div class="codewrapper">
151-
<pre><code class="lang-csharp hljs">public Task&lt;T&gt; DequeueAsync(CancellationToken taskCancellationToken = default(CancellationToken))</code></pre>
178+
<pre><code class="lang-csharp hljs">public Task&lt;T&gt; DequeueAsync(int millisecondsTimeout = 2147483647, CancellationToken taskCancellationToken = default(CancellationToken))</code></pre>
152179
</div>
153180
<h5 class="parameters">Parameters</h5>
154181
<table class="table table-bordered table-striped table-condensed">
@@ -160,6 +187,11 @@ <h5 class="parameters">Parameters</h5>
160187
</tr>
161188
</thead>
162189
<tbody>
190+
<tr>
191+
<td><a class="xref" href="https://docs.microsoft.com/dotnet/api/system.int32">Int32</a></td>
192+
<td><span class="parametername">millisecondsTimeout</span></td>
193+
<td></td>
194+
</tr>
163195
<tr>
164196
<td><a class="xref" href="https://docs.microsoft.com/dotnet/api/system.threading.cancellationtoken">CancellationToken</a></td>
165197
<td><span class="parametername">taskCancellationToken</span></td>
@@ -185,14 +217,14 @@ <h5 class="returns">Returns</h5>
185217

186218

187219
<a id="Advanced_Algorithms_Distributed_AsyncQueue_1_EnqueueAsync_" data-uid="Advanced.Algorithms.Distributed.AsyncQueue`1.EnqueueAsync*"></a>
188-
<h4 id="Advanced_Algorithms_Distributed_AsyncQueue_1_EnqueueAsync__0_System_Threading_CancellationToken_" data-uid="Advanced.Algorithms.Distributed.AsyncQueue`1.EnqueueAsync(`0,System.Threading.CancellationToken)">EnqueueAsync(T, CancellationToken)</h4>
220+
<h4 id="Advanced_Algorithms_Distributed_AsyncQueue_1_EnqueueAsync__0_System_Int32_System_Threading_CancellationToken_" data-uid="Advanced.Algorithms.Distributed.AsyncQueue`1.EnqueueAsync(`0,System.Int32,System.Threading.CancellationToken)">EnqueueAsync(T, Int32, CancellationToken)</h4>
189221
<div class="markdown level1 summary"><p>Supports multi-threaded producers.
190222
Time complexity: O(1).</p>
191223
</div>
192224
<div class="markdown level1 conceptual"></div>
193225
<h5 class="decalaration">Declaration</h5>
194226
<div class="codewrapper">
195-
<pre><code class="lang-csharp hljs">public Task EnqueueAsync(T value, CancellationToken taskCancellationToken = default(CancellationToken))</code></pre>
227+
<pre><code class="lang-csharp hljs">public Task EnqueueAsync(T value, int millisecondsTimeout = 2147483647, CancellationToken taskCancellationToken = default(CancellationToken))</code></pre>
196228
</div>
197229
<h5 class="parameters">Parameters</h5>
198230
<table class="table table-bordered table-striped table-condensed">
@@ -209,6 +241,11 @@ <h5 class="parameters">Parameters</h5>
209241
<td><span class="parametername">value</span></td>
210242
<td></td>
211243
</tr>
244+
<tr>
245+
<td><a class="xref" href="https://docs.microsoft.com/dotnet/api/system.int32">Int32</a></td>
246+
<td><span class="parametername">millisecondsTimeout</span></td>
247+
<td></td>
248+
</tr>
212249
<tr>
213250
<td><a class="xref" href="https://docs.microsoft.com/dotnet/api/system.threading.cancellationtoken">CancellationToken</a></td>
214251
<td><span class="parametername">taskCancellationToken</span></td>

docs/index.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@
397397
"api/Advanced.Algorithms.Distributed.AsyncQueue-1.html": {
398398
"href": "api/Advanced.Algorithms.Distributed.AsyncQueue-1.html",
399399
"title": "Class AsyncQueue<T> | Advanced Algorithms",
400-
"keywords": "Class AsyncQueue<T> A simple asynchronous multi-thread supporting producer/consumer FIFO queue with minimal locking. Inheritance Object AsyncQueue<T> Inherited Members Object.ToString() Object.Equals(Object) Object.Equals(Object, Object) Object.ReferenceEquals(Object, Object) Object.GetHashCode() Object.GetType() Object.MemberwiseClone() Namespace : Advanced.Algorithms.Distributed Assembly : Advanced.Algorithms.dll Syntax public class AsyncQueue<T> Type Parameters Name Description T Methods DequeueAsync(CancellationToken) Supports multi-threaded consumers. Time complexity: O(1). Declaration public Task<T> DequeueAsync(CancellationToken taskCancellationToken = default(CancellationToken)) Parameters Type Name Description CancellationToken taskCancellationToken Returns Type Description Task <T> EnqueueAsync(T, CancellationToken) Supports multi-threaded producers. Time complexity: O(1). Declaration public Task EnqueueAsync(T value, CancellationToken taskCancellationToken = default(CancellationToken)) Parameters Type Name Description T value CancellationToken taskCancellationToken Returns Type Description Task"
400+
"keywords": "Class AsyncQueue<T> A simple asynchronous multi-thread supporting producer/consumer FIFO queue with minimal locking. Inheritance Object AsyncQueue<T> Inherited Members Object.ToString() Object.Equals(Object) Object.Equals(Object, Object) Object.ReferenceEquals(Object, Object) Object.GetHashCode() Object.GetType() Object.MemberwiseClone() Namespace : Advanced.Algorithms.Distributed Assembly : Advanced.Algorithms.dll Syntax public class AsyncQueue<T> Type Parameters Name Description T Properties Count Declaration public int Count { get; } Property Value Type Description Int32 Methods DequeueAsync(Int32, CancellationToken) Supports multi-threaded consumers. Time complexity: O(1). Declaration public Task<T> DequeueAsync(int millisecondsTimeout = 2147483647, CancellationToken taskCancellationToken = default(CancellationToken)) Parameters Type Name Description Int32 millisecondsTimeout CancellationToken taskCancellationToken Returns Type Description Task <T> EnqueueAsync(T, Int32, CancellationToken) Supports multi-threaded producers. Time complexity: O(1). Declaration public Task EnqueueAsync(T value, int millisecondsTimeout = 2147483647, CancellationToken taskCancellationToken = default(CancellationToken)) Parameters Type Name Description T value Int32 millisecondsTimeout CancellationToken taskCancellationToken Returns Type Description Task"
401401
},
402402
"api/Advanced.Algorithms.Distributed.CircularQueue-1.html": {
403403
"href": "api/Advanced.Algorithms.Distributed.CircularQueue-1.html",

docs/xrefmap.yml

+33-16
Original file line numberDiff line numberDiff line change
@@ -9290,14 +9290,31 @@ references:
92909290
fullName.vb: Advanced.Algorithms.Distributed.AsyncQueue(Of T)
92919291
nameWithType: AsyncQueue<T>
92929292
nameWithType.vb: AsyncQueue(Of T)
9293-
- uid: Advanced.Algorithms.Distributed.AsyncQueue`1.DequeueAsync(System.Threading.CancellationToken)
9294-
name: DequeueAsync(CancellationToken)
9295-
href: api/Advanced.Algorithms.Distributed.AsyncQueue-1.html#Advanced_Algorithms_Distributed_AsyncQueue_1_DequeueAsync_System_Threading_CancellationToken_
9296-
commentId: M:Advanced.Algorithms.Distributed.AsyncQueue`1.DequeueAsync(System.Threading.CancellationToken)
9297-
fullName: Advanced.Algorithms.Distributed.AsyncQueue<T>.DequeueAsync(System.Threading.CancellationToken)
9298-
fullName.vb: Advanced.Algorithms.Distributed.AsyncQueue(Of T).DequeueAsync(System.Threading.CancellationToken)
9299-
nameWithType: AsyncQueue<T>.DequeueAsync(CancellationToken)
9300-
nameWithType.vb: AsyncQueue(Of T).DequeueAsync(CancellationToken)
9293+
- uid: Advanced.Algorithms.Distributed.AsyncQueue`1.Count
9294+
name: Count
9295+
href: api/Advanced.Algorithms.Distributed.AsyncQueue-1.html#Advanced_Algorithms_Distributed_AsyncQueue_1_Count
9296+
commentId: P:Advanced.Algorithms.Distributed.AsyncQueue`1.Count
9297+
fullName: Advanced.Algorithms.Distributed.AsyncQueue<T>.Count
9298+
fullName.vb: Advanced.Algorithms.Distributed.AsyncQueue(Of T).Count
9299+
nameWithType: AsyncQueue<T>.Count
9300+
nameWithType.vb: AsyncQueue(Of T).Count
9301+
- uid: Advanced.Algorithms.Distributed.AsyncQueue`1.Count*
9302+
name: Count
9303+
href: api/Advanced.Algorithms.Distributed.AsyncQueue-1.html#Advanced_Algorithms_Distributed_AsyncQueue_1_Count_
9304+
commentId: Overload:Advanced.Algorithms.Distributed.AsyncQueue`1.Count
9305+
isSpec: "True"
9306+
fullName: Advanced.Algorithms.Distributed.AsyncQueue<T>.Count
9307+
fullName.vb: Advanced.Algorithms.Distributed.AsyncQueue(Of T).Count
9308+
nameWithType: AsyncQueue<T>.Count
9309+
nameWithType.vb: AsyncQueue(Of T).Count
9310+
- uid: Advanced.Algorithms.Distributed.AsyncQueue`1.DequeueAsync(System.Int32,System.Threading.CancellationToken)
9311+
name: DequeueAsync(Int32, CancellationToken)
9312+
href: api/Advanced.Algorithms.Distributed.AsyncQueue-1.html#Advanced_Algorithms_Distributed_AsyncQueue_1_DequeueAsync_System_Int32_System_Threading_CancellationToken_
9313+
commentId: M:Advanced.Algorithms.Distributed.AsyncQueue`1.DequeueAsync(System.Int32,System.Threading.CancellationToken)
9314+
fullName: Advanced.Algorithms.Distributed.AsyncQueue<T>.DequeueAsync(System.Int32, System.Threading.CancellationToken)
9315+
fullName.vb: Advanced.Algorithms.Distributed.AsyncQueue(Of T).DequeueAsync(System.Int32, System.Threading.CancellationToken)
9316+
nameWithType: AsyncQueue<T>.DequeueAsync(Int32, CancellationToken)
9317+
nameWithType.vb: AsyncQueue(Of T).DequeueAsync(Int32, CancellationToken)
93019318
- uid: Advanced.Algorithms.Distributed.AsyncQueue`1.DequeueAsync*
93029319
name: DequeueAsync
93039320
href: api/Advanced.Algorithms.Distributed.AsyncQueue-1.html#Advanced_Algorithms_Distributed_AsyncQueue_1_DequeueAsync_
@@ -9307,14 +9324,14 @@ references:
93079324
fullName.vb: Advanced.Algorithms.Distributed.AsyncQueue(Of T).DequeueAsync
93089325
nameWithType: AsyncQueue<T>.DequeueAsync
93099326
nameWithType.vb: AsyncQueue(Of T).DequeueAsync
9310-
- uid: Advanced.Algorithms.Distributed.AsyncQueue`1.EnqueueAsync(`0,System.Threading.CancellationToken)
9311-
name: EnqueueAsync(T, CancellationToken)
9312-
href: api/Advanced.Algorithms.Distributed.AsyncQueue-1.html#Advanced_Algorithms_Distributed_AsyncQueue_1_EnqueueAsync__0_System_Threading_CancellationToken_
9313-
commentId: M:Advanced.Algorithms.Distributed.AsyncQueue`1.EnqueueAsync(`0,System.Threading.CancellationToken)
9314-
fullName: Advanced.Algorithms.Distributed.AsyncQueue<T>.EnqueueAsync(T, System.Threading.CancellationToken)
9315-
fullName.vb: Advanced.Algorithms.Distributed.AsyncQueue(Of T).EnqueueAsync(T, System.Threading.CancellationToken)
9316-
nameWithType: AsyncQueue<T>.EnqueueAsync(T, CancellationToken)
9317-
nameWithType.vb: AsyncQueue(Of T).EnqueueAsync(T, CancellationToken)
9327+
- uid: Advanced.Algorithms.Distributed.AsyncQueue`1.EnqueueAsync(`0,System.Int32,System.Threading.CancellationToken)
9328+
name: EnqueueAsync(T, Int32, CancellationToken)
9329+
href: api/Advanced.Algorithms.Distributed.AsyncQueue-1.html#Advanced_Algorithms_Distributed_AsyncQueue_1_EnqueueAsync__0_System_Int32_System_Threading_CancellationToken_
9330+
commentId: M:Advanced.Algorithms.Distributed.AsyncQueue`1.EnqueueAsync(`0,System.Int32,System.Threading.CancellationToken)
9331+
fullName: Advanced.Algorithms.Distributed.AsyncQueue<T>.EnqueueAsync(T, System.Int32, System.Threading.CancellationToken)
9332+
fullName.vb: Advanced.Algorithms.Distributed.AsyncQueue(Of T).EnqueueAsync(T, System.Int32, System.Threading.CancellationToken)
9333+
nameWithType: AsyncQueue<T>.EnqueueAsync(T, Int32, CancellationToken)
9334+
nameWithType.vb: AsyncQueue(Of T).EnqueueAsync(T, Int32, CancellationToken)
93189335
- uid: Advanced.Algorithms.Distributed.AsyncQueue`1.EnqueueAsync*
93199336
name: EnqueueAsync
93209337
href: api/Advanced.Algorithms.Distributed.AsyncQueue-1.html#Advanced_Algorithms_Distributed_AsyncQueue_1_EnqueueAsync_

0 commit comments

Comments
 (0)