Skip to content

Commit 8f62b3d

Browse files
committed
Pipelines
1 parent 675cf37 commit 8f62b3d

File tree

5 files changed

+227
-3
lines changed

5 files changed

+227
-3
lines changed

Diff for: Async.Netcore.csproj

+4
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,8 @@
1212
</None>
1313
</ItemGroup>
1414

15+
<ItemGroup>
16+
<PackageReference Include="System.IO.Pipelines" Version="4.6.0-preview5.19224.8" />
17+
</ItemGroup>
18+
1519
</Project>

Diff for: PipelineExtensions.cs

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
using System;
2+
using System.Buffers;
3+
using System.IO;
4+
using System.IO.Pipelines;
5+
using System.Net;
6+
using System.Net.Sockets;
7+
using System.Text;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
11+
static class PipelinesExtensions
12+
{
13+
public static void Explain(this Pipelines runnable, TextWriter writer)
14+
{
15+
writer.WriteLine(@"
16+
- All buffer management is delegated to the `PipeReader`/`PipeWriter` implementations.`
17+
- Besides handling the memory management, the other core pipelines feature is the ability to peek at data in the Pipe without actually consuming it.
18+
- `FlushAsync` provides back pressure and flow control. PipeWriter.FlushAsync “blocks” when the amount of data in the Pipe crosses PauseWriterThreshold and “unblocks” when it becomes lower than ResumeWriterThreshold.
19+
- `PipeScheduler` gives fine grained control over scheduling the IO.
20+
");
21+
}
22+
23+
public static void ProcessLine(this Pipelines runnable, Socket socket, in ReadOnlySequence<byte> buffer)
24+
{
25+
Console.Write($"[{socket.RemoteEndPoint}]: ");
26+
foreach (var segment in buffer)
27+
{
28+
Console.Write(Encoding.UTF8.GetString(segment.Span));
29+
}
30+
Console.WriteLine();
31+
}
32+
33+
public static void ReadUntilEOLAndOutputToConsole(this Pipelines runnable, Socket socket, ReadOnlySequence<byte> buffer)
34+
{
35+
SequencePosition? position = null;
36+
do
37+
{
38+
// Find the EOL
39+
position = buffer.PositionOf((byte)'\n');
40+
41+
if (position != null)
42+
{
43+
var line = buffer.Slice(0, position.Value);
44+
runnable.ProcessLine(socket, line);
45+
46+
// This is equivalent to position + 1
47+
var next = buffer.GetPosition(1, position.Value);
48+
49+
// Skip what we've already processed including \n
50+
buffer = buffer.Slice(next);
51+
}
52+
}
53+
while (position != null);
54+
}
55+
56+
public static async Task Send(this Pipelines runnable, CancellationTokenSource tokenSource)
57+
{
58+
var clientSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
59+
await clientSocket.ConnectAsync(new IPEndPoint(IPAddress.Loopback, 8087));
60+
Console.WriteLine("Connecting to port 8087");
61+
62+
string line;
63+
while ((line = Console.ReadLine()) != "exit")
64+
{
65+
var buffer = Encoding.ASCII.GetBytes(line.Replace(Environment.NewLine, string.Empty) + Environment.NewLine);
66+
67+
await clientSocket.SendAsync(new ArraySegment<byte>(buffer), SocketFlags.None);
68+
}
69+
70+
tokenSource.Cancel();
71+
72+
Console.WriteLine("Send done");
73+
}
74+
75+
public static async Task Receive(this Pipelines runnable, Pipe pipe, CancellationToken token)
76+
{
77+
var listenSocket = new Socket(SocketType.Stream, ProtocolType.Tcp);
78+
listenSocket.Bind(new IPEndPoint(IPAddress.Loopback, 8087));
79+
Console.WriteLine("Listening on port 8087");
80+
listenSocket.Listen(120);
81+
82+
using (token.Register(() => listenSocket.Close()))
83+
{
84+
while (!token.IsCancellationRequested)
85+
{
86+
Socket socket;
87+
try
88+
{
89+
socket = await listenSocket.AcceptAsync();
90+
}
91+
catch (SocketException)
92+
{
93+
return;
94+
}
95+
_ = runnable.ProcessLinesAsync(socket, pipe, token);
96+
}
97+
}
98+
}
99+
}

Diff for: Pipelines.cs

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using System;
2+
using System.Buffers;
3+
using System.IO.Pipelines;
4+
using System.Net;
5+
using System.Net.Sockets;
6+
using System.Text;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
10+
class Pipelines : IRunnable
11+
{
12+
public async Task Run()
13+
{
14+
var tokenSource = new CancellationTokenSource();
15+
var token = tokenSource.Token;
16+
17+
var pipe = new Pipe(new PipeOptions(useSynchronizationContext: false));
18+
19+
var receiveTask = this.Receive(pipe, token);
20+
var sendTask = this.Send(tokenSource);
21+
22+
await Task.WhenAll(receiveTask, sendTask);
23+
}
24+
25+
public async Task ProcessLinesAsync(Socket socket, Pipe pipe, CancellationToken token)
26+
{
27+
Console.WriteLine($"[{socket.RemoteEndPoint}]: connected");
28+
29+
Task writing = FillPipeAsync(socket, pipe.Writer, token);
30+
Task reading = ReadPipeAsync(socket, pipe.Reader, token);
31+
32+
await Task.WhenAll(reading, writing).IgnoreCancellation();
33+
34+
Console.WriteLine($"[{socket.RemoteEndPoint}]: disconnected");
35+
}
36+
37+
private static async Task FillPipeAsync(Socket socket, PipeWriter writer, CancellationToken token)
38+
{
39+
const int minimumBufferSize = 512;
40+
41+
while (!token.IsCancellationRequested)
42+
{
43+
try
44+
{
45+
// Request a minimum of 512 bytes from the PipeWriter
46+
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
47+
int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None, token);
48+
if (bytesRead == 0)
49+
{
50+
break;
51+
}
52+
53+
// Tell the PipeWriter how much was read
54+
writer.Advance(bytesRead);
55+
}
56+
catch
57+
{
58+
break;
59+
}
60+
61+
// Make the data available to the PipeReader
62+
FlushResult result = await writer.FlushAsync(token);
63+
64+
if (result.IsCompleted)
65+
{
66+
break;
67+
}
68+
}
69+
70+
// Signal to the reader that we're done writing
71+
writer.Complete();
72+
}
73+
74+
async Task ReadPipeAsync(Socket socket, PipeReader reader, CancellationToken token)
75+
{
76+
while (!token.IsCancellationRequested)
77+
{
78+
ReadResult result = await reader.ReadAsync(token);
79+
80+
ReadOnlySequence<byte> buffer = result.Buffer;
81+
82+
this.ReadUntilEOLAndOutputToConsole(socket, buffer);
83+
84+
// We sliced the buffer until no more data could be processed
85+
// Tell the PipeReader how much we consumed and how much we left to process
86+
reader.AdvanceTo(buffer.Start, buffer.End);
87+
88+
if (result.IsCompleted)
89+
{
90+
break;
91+
}
92+
}
93+
94+
reader.Complete();
95+
}
96+
97+
98+
}

Diff for: README.md

+10-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@
66

77
## AsyncEnumerable
88

9-
- `Stream`, `Utf8JsonWriter`, `System.Threading.Timer`, `CancellationTokenRegistration`, `BinaryWriter`, `TextWriter` and `IAsyncEnumerator<T>` implement `IAsyncDisposable`
10-
- `Stream`, `BinaryWriter`, `TextWriter` calls `.Dispose` synchronously
11-
- `Stream` `FlushAsync` calls `Flush` on another thread which is bad behavior that should be overwritten
9+
- `IAsyncEnumerable<T>` allows to write asynchronous pull based streams, similar to regular enumerables with `yield return` and `yield break`
10+
- `WithCancellation` only adds the token to the enumerator but doesn't influence the state machine
11+
- `WithCancellation` in combination with `[EnumeratorCancellation]` can be used to ceate a combined token
12+
13+
## Pipelines
14+
15+
- All buffer management is delegated to the `PipeReader`/`PipeWriter` implementations.`
16+
- Besides handling the memory management, the other core pipelines feature is the ability to peek at data in the Pipe without actually consuming it.
17+
- `FlushAsync` provides back pressure and flow control. PipeWriter.FlushAsync “blocks” when the amount of data in the Pipe crosses PauseWriterThreshold and “unblocks” when it becomes lower than ResumeWriterThreshold.
18+
- `PipeScheduler` gives fine grained control over scheduling the IO.
1219

Diff for: TaskExtensions.cs

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
4+
static class TaskExtensions
5+
{
6+
public static async Task IgnoreCancellation(this Task task)
7+
{
8+
try
9+
{
10+
await task.ConfigureAwait(false);
11+
}
12+
catch (OperationCanceledException)
13+
{
14+
}
15+
}
16+
}

0 commit comments

Comments
 (0)