Skip to content

Commit 41f2c3b

Browse files
committed
Merge pull request #136 from PowerShell/daviwil/output-buffering
Add intervallic buffering of OutputEvent messages
2 parents d4aec5f + 9aee741 commit 41f2c3b

File tree

15 files changed

+672
-53
lines changed

15 files changed

+672
-53
lines changed

src/PowerShellEditorServices.Protocol/PowerShellEditorServices.Protocol.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@
124124
<Compile Include="Properties\AssemblyInfo.cs" />
125125
<Compile Include="LanguageServer\References.cs" />
126126
<Compile Include="Server\LanguageServerSettings.cs" />
127+
<Compile Include="Server\OutputDebouncer.cs" />
127128
<Compile Include="Server\PromptHandlers.cs" />
128129
</ItemGroup>
129130
<ItemGroup>

src/PowerShellEditorServices.Protocol/Properties/AssemblyInfo.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@
4141
[assembly: AssemblyFileVersion("0.0.0.0")]
4242
[assembly: AssemblyInformationalVersion("0.0.0.0")]
4343

44+
[assembly: InternalsVisibleTo("Microsoft.PowerShell.EditorServices.Test.Protocol")]

src/PowerShellEditorServices.Protocol/Server/DebugAdapter.cs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace Microsoft.PowerShell.EditorServices.Protocol.Server
1919
public class DebugAdapter : DebugAdapterBase
2020
{
2121
private EditorSession editorSession;
22+
private OutputDebouncer outputDebouncer;
2223

2324
public DebugAdapter() : this(new StdioServerChannel())
2425
{
@@ -30,6 +31,9 @@ public DebugAdapter(ChannelBase serverChannel) : base(serverChannel)
3031
this.editorSession.StartSession();
3132
this.editorSession.DebugService.DebuggerStopped += this.DebugService_DebuggerStopped;
3233
this.editorSession.ConsoleService.OutputWritten += this.powerShellContext_OutputWritten;
34+
35+
// Set up the output debouncer to throttle output event writes
36+
this.outputDebouncer = new OutputDebouncer(this);
3337
}
3438

3539
protected override void Initialize()
@@ -59,6 +63,9 @@ protected override void Initialize()
5963

6064
protected override void Shutdown()
6165
{
66+
// Make sure remaining output is flushed before exiting
67+
this.outputDebouncer.Flush().Wait();
68+
6269
Logger.Write(LogLevel.Normal, "Debug adapter is shutting down...");
6370

6471
if (this.editorSession != null)
@@ -383,6 +390,9 @@ await requestContext.SendResult(
383390

384391
async void DebugService_DebuggerStopped(object sender, DebuggerStopEventArgs e)
385392
{
393+
// Flush pending output before sending the event
394+
await this.outputDebouncer.Flush();
395+
386396
await this.SendEvent(
387397
StoppedEvent.Type,
388398
new StoppedEventBody
@@ -400,13 +410,8 @@ await this.SendEvent(
400410

401411
async void powerShellContext_OutputWritten(object sender, OutputWrittenEventArgs e)
402412
{
403-
await this.SendEvent(
404-
OutputEvent.Type,
405-
new OutputEventBody
406-
{
407-
Output = e.OutputText + (e.IncludeNewLine ? "\r\n" : string.Empty),
408-
Category = (e.OutputType == OutputType.Error) ? "stderr" : "stdout"
409-
});
413+
// Queue the output for writing
414+
await this.outputDebouncer.Invoke(e);
410415
}
411416

412417
#endregion

src/PowerShellEditorServices.Protocol/Server/LanguageServer.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public class LanguageServer : LanguageServerBase
2525
private static CancellationTokenSource existingRequestCancellation;
2626

2727
private EditorSession editorSession;
28+
private OutputDebouncer outputDebouncer;
2829
private LanguageServerSettings currentSettings = new LanguageServerSettings();
2930

3031
public LanguageServer() : this(new StdioServerChannel())
@@ -44,6 +45,9 @@ public LanguageServer(ChannelBase serverChannel) : base(serverChannel)
4445
new ProtocolPromptHandlerContext(
4546
this,
4647
this.editorSession.ConsoleService));
48+
49+
// Set up the output debouncer to throttle output event writes
50+
this.outputDebouncer = new OutputDebouncer(this);
4751
}
4852

4953
protected override void Initialize()
@@ -78,6 +82,9 @@ protected override void Initialize()
7882

7983
protected override void Shutdown()
8084
{
85+
// Make sure remaining output is flushed before exiting
86+
this.outputDebouncer.Flush().Wait();
87+
8188
Logger.Write(LogLevel.Normal, "Language service is shutting down...");
8289

8390
if (this.editorSession != null)
@@ -757,13 +764,8 @@ protected Task HandleEvaluateRequest(
757764

758765
async void powerShellContext_OutputWritten(object sender, OutputWrittenEventArgs e)
759766
{
760-
await this.SendEvent(
761-
DebugAdapterMessages.OutputEvent.Type,
762-
new DebugAdapterMessages.OutputEventBody
763-
{
764-
Output = e.OutputText + (e.IncludeNewLine ? "\r\n" : string.Empty),
765-
Category = (e.OutputType == OutputType.Error) ? "stderr" : "stdout"
766-
});
767+
// Queue the output for writing
768+
await this.outputDebouncer.Invoke(e);
767769
}
768770

769771
#endregion
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
//
2+
// Copyright (c) Microsoft. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4+
//
5+
6+
using Microsoft.PowerShell.EditorServices.Protocol.DebugAdapter;
7+
using Microsoft.PowerShell.EditorServices.Protocol.MessageProtocol;
8+
using Microsoft.PowerShell.EditorServices.Utility;
9+
using System.Threading.Tasks;
10+
11+
namespace Microsoft.PowerShell.EditorServices.Protocol.Server
12+
{
13+
/// <summary>
14+
/// Throttles output written via OutputEvents by batching all output
15+
/// written within a short time window and writing it all out at once.
16+
/// </summary>
17+
internal class OutputDebouncer : AsyncDebouncer<OutputWrittenEventArgs>
18+
{
19+
#region Private Fields
20+
21+
private IMessageSender messageSender;
22+
private bool currentOutputIsError = false;
23+
private string currentOutputString = null;
24+
25+
#endregion
26+
27+
#region Constants
28+
29+
// Set a really short window for output flushes. This
30+
// gives the appearance of fast output without the crushing
31+
// overhead of sending an OutputEvent for every single line
32+
// written. At this point it seems that around 10-20 lines get
33+
// batched for each flush when Get-Process is called.
34+
public const int OutputFlushInterval = 200;
35+
36+
#endregion
37+
38+
#region Constructors
39+
40+
public OutputDebouncer(IMessageSender messageSender)
41+
: base(OutputFlushInterval, false)
42+
{
43+
this.messageSender = messageSender;
44+
}
45+
46+
#endregion
47+
48+
#region Private Methods
49+
50+
protected override async Task OnInvoke(OutputWrittenEventArgs output)
51+
{
52+
bool outputIsError = output.OutputType == OutputType.Error;
53+
54+
if (this.currentOutputIsError != outputIsError)
55+
{
56+
if (this.currentOutputString != null)
57+
{
58+
// Flush the output
59+
await this.OnFlush();
60+
}
61+
62+
this.currentOutputString = string.Empty;
63+
this.currentOutputIsError = outputIsError;
64+
}
65+
66+
// Output string could be null if the last output was already flushed
67+
if (this.currentOutputString == null)
68+
{
69+
this.currentOutputString = string.Empty;
70+
}
71+
72+
// Add to string (and include newline)
73+
this.currentOutputString +=
74+
output.OutputText +
75+
(output.IncludeNewLine ?
76+
System.Environment.NewLine :
77+
string.Empty);
78+
}
79+
80+
protected override async Task OnFlush()
81+
{
82+
// Only flush output if there is some to flush
83+
if (this.currentOutputString != null)
84+
{
85+
// Send an event for the current output
86+
await this.messageSender.SendEvent(
87+
OutputEvent.Type,
88+
new OutputEventBody
89+
{
90+
Output = this.currentOutputString,
91+
Category = (this.currentOutputIsError) ? "stderr" : "stdout"
92+
});
93+
94+
// Clear the output string for the next batch
95+
this.currentOutputString = null;
96+
}
97+
}
98+
99+
#endregion
100+
}
101+
}
102+

src/PowerShellEditorServices/PowerShellEditorServices.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
<Compile Include="Session\SessionPSHostUserInterface.cs" />
102102
<Compile Include="Session\SessionStateChangedEventArgs.cs" />
103103
<Compile Include="Utility\AsyncContextThread.cs" />
104+
<Compile Include="Utility\AsyncDebouncer.cs" />
104105
<Compile Include="Utility\AsyncLock.cs" />
105106
<Compile Include="Utility\AsyncQueue.cs" />
106107
<Compile Include="Utility\AsyncContext.cs" />
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
//
2+
// Copyright (c) Microsoft. All rights reserved.
3+
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
4+
//
5+
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
9+
namespace Microsoft.PowerShell.EditorServices.Utility
10+
{
11+
/// <summary>
12+
/// Restricts the invocation of an operation to a specified time
13+
/// interval. Can also cause previous requests to be cancelled
14+
/// by new requests within that time window. Typically used for
15+
/// buffering information for an operation or ensuring that an
16+
/// operation only runs after some interval.
17+
/// </summary>
18+
/// <typeparam name="TInvokeArgs">The argument type for the Invoke method.</typeparam>
19+
public abstract class AsyncDebouncer<TInvokeArgs>
20+
{
21+
#region Private Fields
22+
23+
private int flushInterval;
24+
private bool restartOnInvoke;
25+
26+
private Task currentTimerTask;
27+
private CancellationTokenSource timerCancellationSource;
28+
29+
private AsyncLock asyncLock = new AsyncLock();
30+
31+
#endregion
32+
33+
#region Public Methods
34+
35+
/// <summary>
36+
/// Creates a new instance of the AsyncDebouncer class with the
37+
/// specified flush interval. If restartOnInvoke is true, any
38+
/// calls to Invoke will cancel previous calls which have not yet
39+
/// passed the flush interval.
40+
/// </summary>
41+
/// <param name="flushInterval">
42+
/// A millisecond interval to use for flushing prior Invoke calls.
43+
/// </param>
44+
/// <param name="restartOnInvoke">
45+
/// If true, Invoke calls will reset prior calls which haven't passed the flush interval.
46+
/// </param>
47+
public AsyncDebouncer(int flushInterval, bool restartOnInvoke)
48+
{
49+
this.flushInterval = flushInterval;
50+
this.restartOnInvoke = restartOnInvoke;
51+
}
52+
53+
/// <summary>
54+
/// Invokes the debouncer with the given input. The debouncer will
55+
/// wait for the specified interval before calling the Flush method
56+
/// to complete the operation.
57+
/// </summary>
58+
/// <param name="invokeArgument">
59+
/// The argument for this implementation's Invoke method.
60+
/// </param>
61+
/// <returns>A Task to be awaited until the Invoke is queued.</returns>
62+
public async Task Invoke(TInvokeArgs invokeArgument)
63+
{
64+
using (await this.asyncLock.LockAsync())
65+
{
66+
// Invoke the implementor
67+
await this.OnInvoke(invokeArgument);
68+
69+
// If there's no timer, start one
70+
if (this.currentTimerTask == null)
71+
{
72+
this.StartTimer();
73+
}
74+
else if (this.currentTimerTask != null && this.restartOnInvoke)
75+
{
76+
// Restart the existing timer
77+
if (this.CancelTimer())
78+
{
79+
this.StartTimer();
80+
}
81+
}
82+
}
83+
}
84+
85+
/// <summary>
86+
/// Flushes the latest state regardless of the current interval.
87+
/// An AsyncDebouncer MUST NOT invoke its own Flush method otherwise
88+
/// deadlocks could occur.
89+
/// </summary>
90+
/// <returns>A Task to be awaited until Flush completes.</returns>
91+
public async Task Flush()
92+
{
93+
using (await this.asyncLock.LockAsync())
94+
{
95+
// Cancel the current timer
96+
this.CancelTimer();
97+
98+
// Flush the current output
99+
await this.OnFlush();
100+
}
101+
}
102+
103+
#endregion
104+
105+
#region Abstract Methods
106+
107+
/// <summary>
108+
/// Implemented by the subclass to take the argument for the
109+
/// future operation that will be performed by OnFlush.
110+
/// </summary>
111+
/// <param name="invokeArgument">
112+
/// The argument for this implementation's OnInvoke method.
113+
/// </param>
114+
/// <returns>A Task to be awaited for the invoke to complete.</returns>
115+
protected abstract Task OnInvoke(TInvokeArgs invokeArgument);
116+
117+
/// <summary>
118+
/// Implemented by the subclass to complete the current operation.
119+
/// </summary>
120+
/// <returns>A Task to be awaited for the operation to complete.</returns>
121+
protected abstract Task OnFlush();
122+
123+
#endregion
124+
125+
#region Private Methods
126+
127+
private void StartTimer()
128+
{
129+
this.timerCancellationSource = new CancellationTokenSource();
130+
131+
this.currentTimerTask =
132+
Task.Delay(this.flushInterval, this.timerCancellationSource.Token)
133+
.ContinueWith(
134+
t =>
135+
{
136+
if (!t.IsCanceled)
137+
{
138+
return this.Flush();
139+
}
140+
else
141+
{
142+
return Task.FromResult(true);
143+
}
144+
});
145+
}
146+
147+
private bool CancelTimer()
148+
{
149+
if (this.timerCancellationSource != null)
150+
{
151+
// Attempt to cancel the timer task
152+
this.timerCancellationSource.Cancel();
153+
}
154+
155+
// Was the task cancelled?
156+
bool wasCancelled =
157+
this.currentTimerTask == null ||
158+
this.currentTimerTask.IsCanceled;
159+
160+
// Clear the current task so that another may be created
161+
this.currentTimerTask = null;
162+
163+
return wasCancelled;
164+
}
165+
166+
#endregion
167+
}
168+
}
169+

0 commit comments

Comments
 (0)