Skip to content

Commit af36835

Browse files
Merge pull request #5 from Lercher/master
Fix for Issue #3 and #4
2 parents 9aad539 + d27984a commit af36835

File tree

3 files changed

+12
-20
lines changed

3 files changed

+12
-20
lines changed

Diff for: src/JsonRpc/InputHandler.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private async void ProcessInputStream()
8787
current += await _input.ReadBlockAsync(buffer, current, 1);
8888
}
8989

90-
var headersContent = Encoding.ASCII.GetString(Encoding.ASCII.GetBytes(buffer, 0, current));
90+
var headersContent = new string(buffer, 0, current);
9191
var headers = headersContent.Split(HeaderKeys, StringSplitOptions.RemoveEmptyEntries);
9292
long length = 0;
9393
for (var i = 0; i < headers.Length; i += 2)
@@ -104,7 +104,7 @@ private async void ProcessInputStream()
104104

105105
await _input.ReadBlockAsync(requestBuffer, 0, requestBuffer.Length);
106106

107-
var payload = Encoding.ASCII.GetString(Encoding.ASCII.GetBytes(requestBuffer));
107+
var payload = new string(requestBuffer);
108108

109109
HandleRequest(payload);
110110
}

Diff for: src/JsonRpc/OutputHandler.cs

+9-17
Original file line numberDiff line numberDiff line change
@@ -9,42 +9,38 @@ namespace JsonRpc
99
{
1010
public class OutputHandler : IOutputHandler
1111
{
12-
private readonly TimeSpan _sleepTime = TimeSpan.FromMilliseconds(50);
1312
private readonly TextWriter _output;
1413
private Thread _thread;
15-
private readonly ConcurrentQueue<object> _queue;
14+
private readonly BlockingCollection<object> _queue;
15+
private readonly CancellationTokenSource _cancel;
1616

1717
public OutputHandler(TextWriter output)
1818
{
1919
_output = output;
20-
_queue = new ConcurrentQueue<object>();
20+
_queue = new BlockingCollection<object>();
21+
_cancel = new CancellationTokenSource();
2122
_thread = new Thread(ProcessOutputQueue) {
2223
IsBackground = true
2324
};
2425
}
2526

26-
internal OutputHandler(TextWriter output, TimeSpan sleepTime)
27-
: this(output)
28-
{
29-
_sleepTime = sleepTime;
30-
}
31-
3227
public void Start()
3328
{
3429
_thread.Start();
3530
}
3631

3732
public void Send(object value)
3833
{
39-
_queue.Enqueue(value);
34+
_queue.Add(value);
4035
}
36+
4137
private void ProcessOutputQueue()
4238
{
39+
var token = _cancel.Token;
4340
while (true)
4441
{
4542
if (_thread == null) return;
46-
47-
if (_queue.TryDequeue(out var value))
43+
if (_queue.TryTake(out var value, -1, token))
4844
{
4945
var content = JsonConvert.SerializeObject(value);
5046

@@ -56,18 +52,14 @@ private void ProcessOutputQueue()
5652

5753
_output.Write(sb.ToString());
5854
}
59-
60-
if (_queue.IsEmpty)
61-
{
62-
Thread.Sleep(_sleepTime);
63-
}
6455
}
6556
}
6657

6758
public void Dispose()
6859
{
6960
_output?.Dispose();
7061
_thread = null;
62+
_cancel.Cancel();
7163
}
7264
}
7365
}

Diff for: test/JsonRpc.Tests/OutputHandlerTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ private static (OutputHandler handler, Action wait) NewHandler(TextWriter textWr
1515
cts.CancelAfter(TimeSpan.FromSeconds(5));
1616
action(cts);
1717

18-
var handler = new OutputHandler(textWriter, TimeSpan.Zero);
18+
var handler = new OutputHandler(textWriter);
1919
handler.Start();
2020
return (handler, () => cts.Wait());
2121
}

0 commit comments

Comments
 (0)