Skip to content

Commit

Permalink
Merge pull request #632 from AArnott/testChannelCompletionOnStreamDis…
Browse files Browse the repository at this point in the history
…posal

Avoid throwing ObjectDisposedException from Channel.Completion
  • Loading branch information
AArnott authored Jun 20, 2023
2 parents 48cd9f0 + 1ce11fb commit e0dbd21
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 97 deletions.
20 changes: 20 additions & 0 deletions src/Nerdbank.Streams/MultiplexingStream.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public class Options
/// </summary>
private bool startSuspended;

/// <summary>
/// Backing field for the <see cref="FaultOpenChannelsOnStreamDisposal"/> property.
/// </summary>
private bool faultOpenChannelsOnStreamDisposal;

/// <summary>
/// Initializes a new instance of the <see cref="Options"/> class.
/// </summary>
Expand All @@ -83,6 +88,7 @@ public Options(Options copyFrom)
this.defaultChannelTraceSourceFactory = copyFrom.defaultChannelTraceSourceFactory;
this.defaultChannelTraceSourceFactoryWithQualifier = copyFrom.defaultChannelTraceSourceFactoryWithQualifier;
this.startSuspended = copyFrom.startSuspended;
this.faultOpenChannelsOnStreamDisposal = copyFrom.faultOpenChannelsOnStreamDisposal;
this.SeededChannels = copyFrom.SeededChannels.ToList();
}

Expand Down Expand Up @@ -226,6 +232,20 @@ public bool StartSuspended
/// </remarks>
public IList<ChannelOptions> SeededChannels { get; private set; }

/// <summary>
/// Gets or sets a value indicating whether any open channels should be faulted (i.e. their <see cref="Channel.Completion"/> task will be faulted)
/// when the <see cref="MultiplexingStream"/> is disposed.
/// </summary>
public bool FaultOpenChannelsOnStreamDisposal
{
get => this.faultOpenChannelsOnStreamDisposal;
set
{
this.ThrowIfFrozen();
this.faultOpenChannelsOnStreamDisposal = value;
}
}

/// <summary>
/// Gets a value indicating whether this instance is frozen.
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/Nerdbank.Streams/MultiplexingStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public partial class MultiplexingStream : IDisposableObservable, System.IAsyncDi
/// </summary>
private readonly int protocolMajorVersion;

/// <summary>
/// A value indicating whether any open channels should be faulted (i.e. their <see cref="Channel.Completion"/> task will be faulted)
/// when the <see cref="MultiplexingStream"/> is disposed.
/// </summary>
private readonly bool faultOpenChannelsOnStreamDisposal;

/// <summary>
/// The last number assigned to a channel.
/// Each use of this should increment by two, if <see cref="isOdd"/> has a value.
Expand Down Expand Up @@ -131,6 +137,7 @@ private MultiplexingStream(Formatter formatter, bool? isOdd, Options options)
}

this.TraceSource = options.TraceSource;
this.faultOpenChannelsOnStreamDisposal = options.FaultOpenChannelsOnStreamDisposal;

this.DefaultChannelTraceSourceFactory =
options.DefaultChannelTraceSourceFactoryWithQualifier
Expand Down Expand Up @@ -689,7 +696,7 @@ public async ValueTask DisposeAsync()
{
foreach (KeyValuePair<QualifiedChannelId, Channel> entry in this.openChannels)
{
entry.Value.Dispose(new ObjectDisposedException(nameof(MultiplexingStream)));
entry.Value.Dispose(this.faultOpenChannelsOnStreamDisposal ? new ObjectDisposedException(nameof(MultiplexingStream)) : null);
}

foreach (KeyValuePair<string, Queue<TaskCompletionSource<Channel>>> entry in this.acceptingChannels)
Expand Down
58 changes: 58 additions & 0 deletions src/Nerdbank.Streams/netstandard2.0/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,61 @@ Nerdbank.Streams.MultiplexingStream.QualifiedChannelId.QualifiedChannelId() -> v
Nerdbank.Streams.MultiplexingStream.StartListening() -> void
static Nerdbank.Streams.PipeExtensions.AsPrebufferedStreamAsync(this System.IO.Pipelines.PipeReader! pipeReader, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<System.IO.Stream!>!
static Nerdbank.Streams.StreamExtensions.AsStream(this System.Buffers.ReadOnlySequence<byte> readOnlySequence, System.Action<object?>? disposeAction, object? disposeActionArg) -> System.IO.Stream!
Nerdbank.Streams.BufferWriterExtensions
Nerdbank.Streams.MultiplexingStream.Options.FaultOpenChannelsOnStreamDisposal.get -> bool
Nerdbank.Streams.MultiplexingStream.Options.FaultOpenChannelsOnStreamDisposal.set -> void
Nerdbank.Streams.ReadOnlySequenceExtensions
Nerdbank.Streams.StreamPipeReader
Nerdbank.Streams.StreamPipeReader.Read() -> System.IO.Pipelines.ReadResult
Nerdbank.Streams.StreamPipeReader.StreamPipeReader(System.IO.Stream! stream) -> void
Nerdbank.Streams.StreamPipeReader.StreamPipeReader(System.IO.Stream! stream, int bufferSize, bool leaveOpen) -> void
override Nerdbank.Streams.StreamPipeReader.AdvanceTo(System.SequencePosition consumed) -> void
override Nerdbank.Streams.StreamPipeReader.AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined) -> void
override Nerdbank.Streams.StreamPipeReader.CancelPendingRead() -> void
override Nerdbank.Streams.StreamPipeReader.Complete(System.Exception? exception = null) -> void
override Nerdbank.Streams.StreamPipeReader.OnWriterCompleted(System.Action<System.Exception?, object?>! callback, object? state) -> void
override Nerdbank.Streams.StreamPipeReader.ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult>
override Nerdbank.Streams.StreamPipeReader.TryRead(out System.IO.Pipelines.ReadResult result) -> bool
static Nerdbank.Streams.BufferWriterExtensions.Write<T>(this System.Buffers.IBufferWriter<T>! writer, System.Buffers.ReadOnlySequence<T> sequence) -> void
static Nerdbank.Streams.ReadOnlySequenceExtensions.Clone<T>(this System.Buffers.ReadOnlySequence<T> template) -> System.Buffers.ReadOnlySequence<T>
static System.Buffers.SequenceReaderExtensions.TryReadBigEndian(this ref System.Buffers.SequenceReader<byte> reader, out int value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadBigEndian(this ref System.Buffers.SequenceReader<byte> reader, out long value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadBigEndian(this ref System.Buffers.SequenceReader<byte> reader, out short value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadLittleEndian(this ref System.Buffers.SequenceReader<byte> reader, out int value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadLittleEndian(this ref System.Buffers.SequenceReader<byte> reader, out long value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadLittleEndian(this ref System.Buffers.SequenceReader<byte> reader, out short value) -> bool
System.Buffers.SequenceReader<T>
System.Buffers.SequenceReader<T>.Advance(long count) -> void
System.Buffers.SequenceReader<T>.AdvancePast(T value) -> long
System.Buffers.SequenceReader<T>.AdvancePastAny(System.ReadOnlySpan<T> values) -> long
System.Buffers.SequenceReader<T>.AdvancePastAny(T value0, T value1) -> long
System.Buffers.SequenceReader<T>.AdvancePastAny(T value0, T value1, T value2) -> long
System.Buffers.SequenceReader<T>.AdvancePastAny(T value0, T value1, T value2, T value3) -> long
System.Buffers.SequenceReader<T>.Consumed.get -> long
System.Buffers.SequenceReader<T>.CurrentSpan.get -> System.ReadOnlySpan<T>
System.Buffers.SequenceReader<T>.CurrentSpanIndex.get -> int
System.Buffers.SequenceReader<T>.End.get -> bool
System.Buffers.SequenceReader<T>.IsNext(System.ReadOnlySpan<T> next, bool advancePast = false) -> bool
System.Buffers.SequenceReader<T>.IsNext(T next, bool advancePast = false) -> bool
System.Buffers.SequenceReader<T>.Length.get -> long
System.Buffers.SequenceReader<T>.Position.get -> System.SequencePosition
System.Buffers.SequenceReader<T>.Remaining.get -> long
System.Buffers.SequenceReader<T>.Rewind(long count) -> void
System.Buffers.SequenceReader<T>.Sequence.get -> System.Buffers.ReadOnlySequence<T>
System.Buffers.SequenceReader<T>.SequenceReader() -> void
System.Buffers.SequenceReader<T>.SequenceReader(System.Buffers.ReadOnlySequence<T> sequence) -> void
System.Buffers.SequenceReader<T>.TryAdvanceTo(T delimiter, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryAdvanceToAny(System.ReadOnlySpan<T> delimiters, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryCopyTo(System.Span<T> destination) -> bool
System.Buffers.SequenceReader<T>.TryPeek(out T value) -> bool
System.Buffers.SequenceReader<T>.TryRead(out T value) -> bool
System.Buffers.SequenceReader<T>.TryReadExact(int count, out System.Buffers.ReadOnlySequence<T> sequence) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.Buffers.ReadOnlySequence<T> sequence, System.ReadOnlySpan<T> delimiter, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.Buffers.ReadOnlySequence<T> sequence, T delimiter, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.Buffers.ReadOnlySequence<T> sequence, T delimiter, T delimiterEscape, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.ReadOnlySpan<T> span, T delimiter, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.ReadOnlySpan<T> span, T delimiter, T delimiterEscape, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadToAny(out System.Buffers.ReadOnlySequence<T> sequence, System.ReadOnlySpan<T> delimiters, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadToAny(out System.ReadOnlySpan<T> span, System.ReadOnlySpan<T> delimiters, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.UnreadSpan.get -> System.ReadOnlySpan<T>
System.Buffers.SequenceReaderExtensions
56 changes: 0 additions & 56 deletions src/Nerdbank.Streams/netstandard2.0/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,56 +0,0 @@
Nerdbank.Streams.BufferWriterExtensions
Nerdbank.Streams.ReadOnlySequenceExtensions
Nerdbank.Streams.StreamPipeReader
Nerdbank.Streams.StreamPipeReader.Read() -> System.IO.Pipelines.ReadResult
Nerdbank.Streams.StreamPipeReader.StreamPipeReader(System.IO.Stream! stream) -> void
Nerdbank.Streams.StreamPipeReader.StreamPipeReader(System.IO.Stream! stream, int bufferSize, bool leaveOpen) -> void
override Nerdbank.Streams.StreamPipeReader.AdvanceTo(System.SequencePosition consumed) -> void
override Nerdbank.Streams.StreamPipeReader.AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined) -> void
override Nerdbank.Streams.StreamPipeReader.CancelPendingRead() -> void
override Nerdbank.Streams.StreamPipeReader.Complete(System.Exception? exception = null) -> void
override Nerdbank.Streams.StreamPipeReader.OnWriterCompleted(System.Action<System.Exception?, object?>! callback, object? state) -> void
override Nerdbank.Streams.StreamPipeReader.ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult>
override Nerdbank.Streams.StreamPipeReader.TryRead(out System.IO.Pipelines.ReadResult result) -> bool
static Nerdbank.Streams.BufferWriterExtensions.Write<T>(this System.Buffers.IBufferWriter<T>! writer, System.Buffers.ReadOnlySequence<T> sequence) -> void
static Nerdbank.Streams.ReadOnlySequenceExtensions.Clone<T>(this System.Buffers.ReadOnlySequence<T> template) -> System.Buffers.ReadOnlySequence<T>
static System.Buffers.SequenceReaderExtensions.TryReadBigEndian(this ref System.Buffers.SequenceReader<byte> reader, out int value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadBigEndian(this ref System.Buffers.SequenceReader<byte> reader, out long value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadBigEndian(this ref System.Buffers.SequenceReader<byte> reader, out short value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadLittleEndian(this ref System.Buffers.SequenceReader<byte> reader, out int value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadLittleEndian(this ref System.Buffers.SequenceReader<byte> reader, out long value) -> bool
static System.Buffers.SequenceReaderExtensions.TryReadLittleEndian(this ref System.Buffers.SequenceReader<byte> reader, out short value) -> bool
System.Buffers.SequenceReader<T>
System.Buffers.SequenceReader<T>.Advance(long count) -> void
System.Buffers.SequenceReader<T>.AdvancePast(T value) -> long
System.Buffers.SequenceReader<T>.AdvancePastAny(System.ReadOnlySpan<T> values) -> long
System.Buffers.SequenceReader<T>.AdvancePastAny(T value0, T value1) -> long
System.Buffers.SequenceReader<T>.AdvancePastAny(T value0, T value1, T value2) -> long
System.Buffers.SequenceReader<T>.AdvancePastAny(T value0, T value1, T value2, T value3) -> long
System.Buffers.SequenceReader<T>.Consumed.get -> long
System.Buffers.SequenceReader<T>.CurrentSpan.get -> System.ReadOnlySpan<T>
System.Buffers.SequenceReader<T>.CurrentSpanIndex.get -> int
System.Buffers.SequenceReader<T>.End.get -> bool
System.Buffers.SequenceReader<T>.IsNext(System.ReadOnlySpan<T> next, bool advancePast = false) -> bool
System.Buffers.SequenceReader<T>.IsNext(T next, bool advancePast = false) -> bool
System.Buffers.SequenceReader<T>.Length.get -> long
System.Buffers.SequenceReader<T>.Position.get -> System.SequencePosition
System.Buffers.SequenceReader<T>.Remaining.get -> long
System.Buffers.SequenceReader<T>.Rewind(long count) -> void
System.Buffers.SequenceReader<T>.Sequence.get -> System.Buffers.ReadOnlySequence<T>
System.Buffers.SequenceReader<T>.SequenceReader() -> void
System.Buffers.SequenceReader<T>.SequenceReader(System.Buffers.ReadOnlySequence<T> sequence) -> void
System.Buffers.SequenceReader<T>.TryAdvanceTo(T delimiter, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryAdvanceToAny(System.ReadOnlySpan<T> delimiters, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryCopyTo(System.Span<T> destination) -> bool
System.Buffers.SequenceReader<T>.TryPeek(out T value) -> bool
System.Buffers.SequenceReader<T>.TryRead(out T value) -> bool
System.Buffers.SequenceReader<T>.TryReadExact(int count, out System.Buffers.ReadOnlySequence<T> sequence) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.Buffers.ReadOnlySequence<T> sequence, System.ReadOnlySpan<T> delimiter, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.Buffers.ReadOnlySequence<T> sequence, T delimiter, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.Buffers.ReadOnlySequence<T> sequence, T delimiter, T delimiterEscape, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.ReadOnlySpan<T> span, T delimiter, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadTo(out System.ReadOnlySpan<T> span, T delimiter, T delimiterEscape, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadToAny(out System.Buffers.ReadOnlySequence<T> sequence, System.ReadOnlySpan<T> delimiters, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.TryReadToAny(out System.ReadOnlySpan<T> span, System.ReadOnlySpan<T> delimiters, bool advancePastDelimiter = true) -> bool
System.Buffers.SequenceReader<T>.UnreadSpan.get -> System.ReadOnlySpan<T>
System.Buffers.SequenceReaderExtensions
17 changes: 17 additions & 0 deletions src/Nerdbank.Streams/netstandard2.1/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,20 @@ Nerdbank.Streams.MultiplexingStream.QualifiedChannelId.QualifiedChannelId() -> v
Nerdbank.Streams.MultiplexingStream.StartListening() -> void
static Nerdbank.Streams.PipeExtensions.AsPrebufferedStreamAsync(this System.IO.Pipelines.PipeReader! pipeReader, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<System.IO.Stream!>!
static Nerdbank.Streams.StreamExtensions.AsStream(this System.Buffers.ReadOnlySequence<byte> readOnlySequence, System.Action<object?>? disposeAction, object? disposeActionArg) -> System.IO.Stream!
Nerdbank.Streams.BufferWriterExtensions
Nerdbank.Streams.MultiplexingStream.Options.FaultOpenChannelsOnStreamDisposal.get -> bool
Nerdbank.Streams.MultiplexingStream.Options.FaultOpenChannelsOnStreamDisposal.set -> void
Nerdbank.Streams.ReadOnlySequenceExtensions
Nerdbank.Streams.StreamPipeReader
Nerdbank.Streams.StreamPipeReader.Read() -> System.IO.Pipelines.ReadResult
Nerdbank.Streams.StreamPipeReader.StreamPipeReader(System.IO.Stream! stream) -> void
Nerdbank.Streams.StreamPipeReader.StreamPipeReader(System.IO.Stream! stream, int bufferSize, bool leaveOpen) -> void
override Nerdbank.Streams.StreamPipeReader.AdvanceTo(System.SequencePosition consumed) -> void
override Nerdbank.Streams.StreamPipeReader.AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined) -> void
override Nerdbank.Streams.StreamPipeReader.CancelPendingRead() -> void
override Nerdbank.Streams.StreamPipeReader.Complete(System.Exception? exception = null) -> void
override Nerdbank.Streams.StreamPipeReader.OnWriterCompleted(System.Action<System.Exception?, object?>! callback, object? state) -> void
override Nerdbank.Streams.StreamPipeReader.ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult>
override Nerdbank.Streams.StreamPipeReader.TryRead(out System.IO.Pipelines.ReadResult result) -> bool
static Nerdbank.Streams.BufferWriterExtensions.Write<T>(this System.Buffers.IBufferWriter<T>! writer, System.Buffers.ReadOnlySequence<T> sequence) -> void
static Nerdbank.Streams.ReadOnlySequenceExtensions.Clone<T>(this System.Buffers.ReadOnlySequence<T> template) -> System.Buffers.ReadOnlySequence<T>
15 changes: 0 additions & 15 deletions src/Nerdbank.Streams/netstandard2.1/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1,15 +0,0 @@
Nerdbank.Streams.BufferWriterExtensions
Nerdbank.Streams.ReadOnlySequenceExtensions
Nerdbank.Streams.StreamPipeReader
Nerdbank.Streams.StreamPipeReader.Read() -> System.IO.Pipelines.ReadResult
Nerdbank.Streams.StreamPipeReader.StreamPipeReader(System.IO.Stream! stream) -> void
Nerdbank.Streams.StreamPipeReader.StreamPipeReader(System.IO.Stream! stream, int bufferSize, bool leaveOpen) -> void
override Nerdbank.Streams.StreamPipeReader.AdvanceTo(System.SequencePosition consumed) -> void
override Nerdbank.Streams.StreamPipeReader.AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined) -> void
override Nerdbank.Streams.StreamPipeReader.CancelPendingRead() -> void
override Nerdbank.Streams.StreamPipeReader.Complete(System.Exception? exception = null) -> void
override Nerdbank.Streams.StreamPipeReader.OnWriterCompleted(System.Action<System.Exception?, object?>! callback, object? state) -> void
override Nerdbank.Streams.StreamPipeReader.ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult>
override Nerdbank.Streams.StreamPipeReader.TryRead(out System.IO.Pipelines.ReadResult result) -> bool
static Nerdbank.Streams.BufferWriterExtensions.Write<T>(this System.Buffers.IBufferWriter<T>! writer, System.Buffers.ReadOnlySequence<T> sequence) -> void
static Nerdbank.Streams.ReadOnlySequenceExtensions.Clone<T>(this System.Buffers.ReadOnlySequence<T> template) -> System.Buffers.ReadOnlySequence<T>
Loading

0 comments on commit e0dbd21

Please sign in to comment.