10
10
using System . Collections . Generic ;
11
11
using System . Linq ;
12
12
using System . Text ;
13
+ using System . Threading ;
13
14
using System . Threading . Tasks ;
14
15
using Xunit ;
15
16
@@ -20,7 +21,7 @@ internal class OutputReader
20
21
private AsyncQueue < OutputEventBody > outputQueue = new AsyncQueue < OutputEventBody > ( ) ;
21
22
22
23
private string currentOutputCategory ;
23
- private Queue < string > bufferedOutput = new Queue < string > ( ) ;
24
+ private Queue < Tuple < string , bool > > bufferedOutput = new Queue < Tuple < string , bool > > ( ) ;
24
25
25
26
public OutputReader ( ProtocolEndpoint protocolClient )
26
27
{
@@ -31,34 +32,80 @@ public OutputReader(ProtocolEndpoint protocolClient)
31
32
32
33
public async Task < string > ReadLine ( string expectedOutputCategory = "stdout" )
33
34
{
34
- if ( this . bufferedOutput . Count > 0 )
35
+ try
35
36
{
36
- Assert . Equal ( expectedOutputCategory , this . currentOutputCategory ) ;
37
+ bool lineHasNewLine = false ;
38
+ string [ ] outputLines = null ;
39
+ string nextOutputString = string . Empty ;
37
40
38
- return this . bufferedOutput . Dequeue ( ) ;
39
- }
40
-
41
- // Execution reaches this point if a buffered line wasn't available
42
- OutputEventBody nextOutputEvent = await this . outputQueue . DequeueAsync ( ) ;
41
+ // Wait no longer than 7 seconds for output to come back
42
+ CancellationTokenSource cancellationSource = new CancellationTokenSource ( 7000 ) ;
43
43
44
- Assert . Equal ( expectedOutputCategory , nextOutputEvent . Category ) ;
45
- this . currentOutputCategory = nextOutputEvent . Category ;
44
+ // Any lines in the buffer?
45
+ if ( this . bufferedOutput . Count > 0 )
46
+ {
47
+ Assert . Equal ( expectedOutputCategory , this . currentOutputCategory ) ;
46
48
47
- string [ ] outputLines =
48
- nextOutputEvent . Output . Split (
49
- new string [ ] { "\n " , "\r \n " } ,
50
- StringSplitOptions . None ) ;
49
+ // Return the first buffered line
50
+ var lineTuple = this . bufferedOutput . Dequeue ( ) ;
51
+ nextOutputString = lineTuple . Item1 ;
52
+ lineHasNewLine = lineTuple . Item2 ;
53
+ }
51
54
52
- // Buffer remaining lines
53
- if ( outputLines . Length > 1 )
54
- {
55
- for ( int i = 1 ; i < outputLines . Length ; i ++ )
55
+ // Loop until we get a full line of output
56
+ while ( ! lineHasNewLine )
56
57
{
57
- this . bufferedOutput . Enqueue ( outputLines [ i ] ) ;
58
+ // Execution reaches this point if a buffered line wasn't available
59
+ Task < OutputEventBody > outputTask =
60
+ this . outputQueue . DequeueAsync (
61
+ cancellationSource . Token ) ;
62
+ OutputEventBody nextOutputEvent = await outputTask ;
63
+
64
+ // Verify that the output is of the expected type
65
+ Assert . Equal ( expectedOutputCategory , nextOutputEvent . Category ) ;
66
+ this . currentOutputCategory = nextOutputEvent . Category ;
67
+
68
+ // Split up the output into multiple lines
69
+ outputLines =
70
+ nextOutputEvent . Output . Split (
71
+ new string [ ] { "\n " , "\r \n " } ,
72
+ StringSplitOptions . None ) ;
73
+
74
+ // Add the first bit of output to the existing string
75
+ nextOutputString += outputLines [ 0 ] ;
76
+
77
+ // Have we found a newline now?
78
+ lineHasNewLine =
79
+ outputLines . Length > 1 ||
80
+ nextOutputEvent . Output . EndsWith ( "\n " ) ;
81
+
82
+ // Buffer any remaining lines for future reads
83
+ if ( outputLines . Length > 1 )
84
+ {
85
+ for ( int i = 1 ; i < outputLines . Length ; i ++ )
86
+ {
87
+ this . bufferedOutput . Enqueue (
88
+ new Tuple < string , bool > (
89
+ outputLines [ i ] ,
90
+
91
+ // The line has a newline if it's not the last segment or
92
+ // if the current output string ends with a newline
93
+ i < outputLines . Length - 1 ||
94
+ nextOutputEvent . Output . EndsWith ( "\n " ) ) ) ;
95
+ }
96
+ }
97
+
98
+ // At this point, the state of lineHasNewLine will determine
99
+ // whether the loop continues to wait for another output
100
+ // event that completes the current line.
58
101
}
59
- }
60
102
61
- return outputLines [ 0 ] ;
103
+ return nextOutputString ;
104
+ }
105
+ catch ( TaskCanceledException )
106
+ {
107
+ throw new TimeoutException ( "Timed out waiting for an input line." ) ;
108
+ }
62
109
}
63
110
64
111
public async Task < string [ ] > ReadLines ( int lineCount , string expectedOutputCategory = "stdout" )
0 commit comments