Skip to content

Commit 77bfffd

Browse files
committed
fix thread starvation in ManualExecutor.*Wait
Threads may fail to wake for some time since the method waits on a `ManualResetEventSlim` which is reset before the next spin. To prevent such cases the waiting condition has been modified to additionally check an integer which is incremented after every spin to detected if the spin which was waited upon has ended.
1 parent 2e5bdd9 commit 77bfffd

File tree

2 files changed

+119
-16
lines changed

2 files changed

+119
-16
lines changed

src/ros2cs/ros2cs_core/executors/ManualExecutor.cs

Lines changed: 116 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,29 @@ public IContext Context
4444
/// </summary>
4545
public bool IsSpinning
4646
{
47-
get { return !this.IsIdle.IsSet; }
47+
get => this._IsSpinning;
48+
private set => this._IsSpinning = value;
4849
}
4950

51+
private volatile bool _IsSpinning = false;
52+
5053
/// <summary>
5154
/// Whether a rescan is scheduled.
5255
/// </summary>
5356
public bool RescanScheduled
5457
{
55-
get { return this._RescanScheduled; }
56-
private set { this._RescanScheduled = value; }
58+
get => this._RescanScheduled;
59+
private set => this._RescanScheduled = value;
5760
}
5861

5962
// volatile since it may be changed by multiple threads
6063
private volatile bool _RescanScheduled = false;
6164

65+
/// <summary>
66+
/// To prevent <see cref="TryWait"/> from being starved by multiple spins.
67+
/// </summary>
68+
private long SpinId = 0;
69+
6270
/// <inheritdoc/>
6371
public bool IsDisposed
6472
{
@@ -92,6 +100,9 @@ public bool IsReadOnly
92100
/// <summary>
93101
/// Wait set used while spinning.
94102
/// </summary>
103+
/// <remarks>
104+
/// Is also used to notify <see cref="TryWait"/> when the executor finished spinning.
105+
/// </remarks>
95106
private readonly WaitSet WaitSet;
96107

97108
/// <summary>
@@ -104,11 +115,6 @@ public bool IsReadOnly
104115
/// </summary>
105116
private readonly HashSet<INode> Nodes = new HashSet<INode>();
106117

107-
/// <summary>
108-
/// Event signaling whether the executor is not spinning.
109-
/// </summary>
110-
private readonly ManualResetEventSlim IsIdle = new ManualResetEventSlim(true);
111-
112118
/// <summary>
113119
/// Create a new instance.
114120
/// </summary>
@@ -286,21 +292,92 @@ public bool TryScheduleRescan(INode node)
286292
/// <inheritdoc/>
287293
public void Wait()
288294
{
289-
bool success = this.TryWait(TimeSpan.FromMilliseconds(-1));
290-
Debug.Assert(success, "infinite wait timed out");
295+
if (this.RescanScheduled)
296+
{
297+
lock (this.WaitSet)
298+
{
299+
this.WaitUntilDone(this.SpinId);
300+
}
301+
}
291302
}
292303

293304
/// <remarks>
294305
/// This method is thread safe.
295306
/// </remarks>
296307
/// <exception cref="ObjectDisposedException"> If the executor was disposed. </exception>
308+
/// <exception cref="ArgumentOutOfRangeException"> If the timeout is negative or too big. </exception>
297309
/// <inheritdoc/>
298310
public bool TryWait(TimeSpan timeout)
299311
{
300-
if (this.RescanScheduled && this.IsSpinning)
312+
if (timeout.Ticks < 0)
313+
{
314+
throw new ArgumentOutOfRangeException("timeout is negative");
315+
}
316+
if (this.RescanScheduled)
317+
{
318+
lock (this.WaitSet)
319+
{
320+
// read id inside the lock to prevent an outdated id from being copied
321+
return this.WaitUntilDone(this.SpinId, timeout);
322+
}
323+
}
324+
return true;
325+
}
326+
327+
/// <summary>
328+
/// Utility method to wait until the current spin has finished.
329+
/// </summary>
330+
/// <remarks>
331+
/// This replaces a <see cref="ManualResetEventSlim"/> which did starve waiters
332+
/// when spinning multiple times.
333+
/// </remarks>
334+
/// <param name="spinId"> Current spin id. </param>
335+
private void WaitUntilDone(long spinId)
336+
{
337+
// the condition is checked with the lock held to prevent
338+
// a the spin from pulsing before the wait can be started
339+
while (this.IsSpinning && this.SpinId == spinId)
340+
{
341+
try
342+
{
343+
// stop a possible current spin
344+
this.Interrupt();
345+
}
346+
catch (ObjectDisposedException)
347+
{
348+
// if the context is shut down then the
349+
// guard condition might be disposed but
350+
// nodes still have to be removed
351+
}
352+
Monitor.Wait(this.WaitSet);
353+
}
354+
}
355+
356+
/// <summary>
357+
/// Utility method to wait until the current spin has finished.
358+
/// </summary>
359+
/// <param name="spinId"> Current spin id. </param>
360+
/// <param name="timeout"> Timeout when waiting </param>
361+
/// <returns> Whether the wait did not time out. </returns>
362+
/// <exception cref="ArgumentOutOfRangeException"> Timeout is too big. </exception>
363+
private bool WaitUntilDone(long spinId, TimeSpan timeout)
364+
{
365+
int milliSeconds;
366+
try
367+
{
368+
milliSeconds = Convert.ToInt32(timeout.TotalMilliseconds);
369+
}
370+
catch (OverflowException e)
371+
{
372+
throw new ArgumentOutOfRangeException("timeout too big", e);
373+
}
374+
int remainingTimeout = milliSeconds;
375+
uint startTime = (uint)Environment.TickCount;
376+
while (this.IsSpinning && this.SpinId == spinId)
301377
{
302378
try
303379
{
380+
// stop a possible current spin
304381
this.Interrupt();
305382
}
306383
catch (ObjectDisposedException)
@@ -309,7 +386,22 @@ public bool TryWait(TimeSpan timeout)
309386
// guard condition might be disposed but
310387
// nodes still have to be removed
311388
}
312-
return this.IsIdle.Wait(timeout);
389+
if (!Monitor.Wait(this.WaitSet, remainingTimeout))
390+
{
391+
// if the wait timed out return immediately
392+
return false;
393+
}
394+
// update the timeout for the next wait
395+
uint elapsed = (uint)Environment.TickCount - startTime;
396+
if (elapsed > int.MaxValue)
397+
{
398+
return false;
399+
}
400+
remainingTimeout = milliSeconds - (int)elapsed;
401+
if (remainingTimeout <= 0)
402+
{
403+
return false;
404+
}
313405
}
314406
return true;
315407
}
@@ -338,10 +430,10 @@ public void Interrupt()
338430
/// <returns> Whether work could be processed since no rescan was scheduled. </returns>
339431
public bool TrySpin(TimeSpan timeout)
340432
{
341-
this.IsIdle.Reset();
433+
this.IsSpinning = true;
342434
try
343435
{
344-
// check after resetting IsIdle to
436+
// check after setting IsSpinning to
345437
// prevent race condition
346438
if (this.RescanScheduled)
347439
{
@@ -357,7 +449,16 @@ public bool TrySpin(TimeSpan timeout)
357449
}
358450
finally
359451
{
360-
this.IsIdle.Set();
452+
// update flag before waking threads
453+
this.IsSpinning = false;
454+
lock (this.WaitSet)
455+
{
456+
// prevent other threads from reading stale result
457+
// overflow is acceptable
458+
unchecked { this.SpinId++; }
459+
// notify other threads that we finished spinning
460+
Monitor.PulseAll(this.WaitSet);
461+
}
361462
}
362463
return true;
363464
}
@@ -477,7 +578,6 @@ public void Dispose()
477578
}
478579
this.WaitSet.Dispose();
479580
this.InterruptCondition.Dispose();
480-
this.IsIdle.Dispose();
481581
}
482582
}
483583
}

src/ros2cs/ros2cs_tests/src/ManualExecutorTest.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public void DisposedExecutorHandling()
5858

5959
this.Context.TryCreateNode("test_node", out var node);
6060
this.Executor.Add(node);
61+
this.Executor.Rescan();
6162
this.Executor.Dispose();
6263

6364
Assert.That(this.Executor.IsDisposed, Is.True);
@@ -125,13 +126,15 @@ public void TryWaitScheduled()
125126
this.Executor.ScheduleRescan();
126127

127128
Assert.That(this.Executor.TryWait(TimeSpan.Zero), Is.True);
129+
this.Executor.Wait();
128130
Assert.That(this.Executor.RescanScheduled, Is.True);
129131
}
130132

131133
[Test]
132134
public void TryWaitUnscheduled()
133135
{
134136
Assert.That(this.Executor.TryWait(TimeSpan.Zero), Is.True);
137+
this.Executor.Wait();
135138
Assert.That(this.Executor.RescanScheduled, Is.False);
136139
}
137140

0 commit comments

Comments
 (0)