diff --git a/Examples/SimpleRtspClient/Program.cs b/Examples/SimpleRtspClient/Program.cs index bd0b1a9..41d88b9 100644 --- a/Examples/SimpleRtspClient/Program.cs +++ b/Examples/SimpleRtspClient/Program.cs @@ -24,7 +24,7 @@ static void Main() cancellationTokenSource.Cancel(); - Console.WriteLine("Cancelling"); + Console.WriteLine("Canceling"); connectTask.Wait(CancellationToken.None); } @@ -47,6 +47,10 @@ private static async Task ConnectAsync(ConnectionParameters connectionParameters { await rtspClient.ConnectAsync(token); } + catch (OperationCanceledException) + { + return; + } catch (RtspClientException e) { Console.WriteLine(e.ToString()); @@ -60,6 +64,10 @@ private static async Task ConnectAsync(ConnectionParameters connectionParameters { await rtspClient.ReceiveAsync(token); } + catch (OperationCanceledException) + { + return; + } catch (RtspClientException e) { Console.WriteLine(e.ToString()); diff --git a/Examples/SimpleRtspPlayer/App.xaml b/Examples/SimpleRtspPlayer/App.xaml index d861037..1f8146a 100644 --- a/Examples/SimpleRtspPlayer/App.xaml +++ b/Examples/SimpleRtspPlayer/App.xaml @@ -1,4 +1,4 @@ - _mainWindowModel.VideoSource; - private readonly CommandHandler _startClickCommand; - public ICommand StartClickCommand => _startClickCommand; - - private readonly CommandHandler _stopClickCommand; - public ICommand StopClickCommand => _stopClickCommand; + public RelayCommand StartClickCommand { get; } + public RelayCommand StopClickCommand { get; } + public RelayCommand ClosingCommand { get; } public string Status { @@ -46,9 +45,10 @@ public string Status public MainWindowViewModel(IMainWindowModel mainWindowModel) { _mainWindowModel = mainWindowModel ?? throw new ArgumentNullException(nameof(mainWindowModel)); - - _startClickCommand = new CommandHandler(OnStartButtonClick, true); - _stopClickCommand = new CommandHandler(OnStopButtonClick, false); + + StartClickCommand = new RelayCommand(OnStartButtonClick, () => _startButtonEnabled); + StopClickCommand = new RelayCommand(OnStopButtonClick, () => _stopButtonEnabled); + ClosingCommand = new RelayCommand(OnClosing); } protected virtual void OnPropertyChanged([CallerMemberName] string propertyName = null) @@ -71,12 +71,14 @@ private void OnStartButtonClick() var credential = new NetworkCredential(Login, Password); var connectionParameters = new ConnectionParameters(deviceUri, credential); - + connectionParameters.RtpTransport = RtpTransportProtocol.UDP; _mainWindowModel.Start(connectionParameters); _mainWindowModel.StatusChanged += MainWindowModelOnStatusChanged; - _startClickCommand.SetCanExecute(false); - _stopClickCommand.SetCanExecute(true); + _startButtonEnabled = false; + StartClickCommand.RaiseCanExecuteChanged(); + _stopButtonEnabled = true; + StopClickCommand.RaiseCanExecuteChanged(); } private void OnStopButtonClick() @@ -84,8 +86,10 @@ private void OnStopButtonClick() _mainWindowModel.Stop(); _mainWindowModel.StatusChanged -= MainWindowModelOnStatusChanged; - _stopClickCommand.SetCanExecute(false); - _startClickCommand.SetCanExecute(true); + _stopButtonEnabled = false; + StopClickCommand.RaiseCanExecuteChanged(); + _startButtonEnabled = true; + StartClickCommand.RaiseCanExecuteChanged(); Status = string.Empty; } @@ -93,5 +97,10 @@ private void MainWindowModelOnStatusChanged(object sender, string s) { Application.Current.Dispatcher.Invoke(() => Status = s); } + + private void OnClosing(CancelEventArgs args) + { + _mainWindowModel.Stop(); + } } } \ No newline at end of file diff --git a/Examples/SimpleRtspPlayer/GUI/Views/MainWindow.xaml b/Examples/SimpleRtspPlayer/GUI/Views/MainWindow.xaml index 4a5a284..c1fd29f 100644 --- a/Examples/SimpleRtspPlayer/GUI/Views/MainWindow.xaml +++ b/Examples/SimpleRtspPlayer/GUI/Views/MainWindow.xaml @@ -5,6 +5,8 @@ xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" xmlns:gui="clr-namespace:SimpleRtspPlayer.GUI" xmlns:views="clr-namespace:SimpleRtspPlayer.GUI.Views" + xmlns:i="http://schemas.microsoft.com/expression/2010/interactivity" + xmlns:command="http://www.galasoft.ch/mvvmlight" mc:Ignorable="d" Title="SimpleRtspPlayer" Width="1200" Height="675" MinWidth="16" MinHeight="16" WindowStartupLocation="CenterScreen" @@ -26,6 +28,11 @@ + + + + + diff --git a/Examples/SimpleRtspPlayer/GUI/Views/VideoView.xaml.cs b/Examples/SimpleRtspPlayer/GUI/Views/VideoView.xaml.cs index d94f430..583ee94 100644 --- a/Examples/SimpleRtspPlayer/GUI/Views/VideoView.xaml.cs +++ b/Examples/SimpleRtspPlayer/GUI/Views/VideoView.xaml.cs @@ -26,6 +26,7 @@ public partial class VideoView private int _height; private Int32Rect _dirtyRect; private readonly Action _invalidateAction; + private DispatcherOperation _invalidateOperation; private Task _handleSizeChangedTask = Task.CompletedTask; private CancellationTokenSource _resizeCancellationTokenSource = new CancellationTokenSource(); @@ -141,7 +142,10 @@ private static void OnVideoSourceChanged(DependencyObject d, DependencyPropertyC private void OnFrameReceived(object sender, IDecodedVideoFrame decodedFrame) { - Application.Current.Dispatcher.Invoke(_invalidateAction, DispatcherPriority.Send, decodedFrame); + if(_invalidateOperation != null && _invalidateOperation.Status != DispatcherOperationStatus.Completed) + return; + + _invalidateOperation = Application.Current.Dispatcher.BeginInvoke(_invalidateAction, DispatcherPriority.Send, decodedFrame); } private void Invalidate(IDecodedVideoFrame decodedVideoFrame) diff --git a/Examples/SimpleRtspPlayer/SimpleRtspPlayer.csproj b/Examples/SimpleRtspPlayer/SimpleRtspPlayer.csproj index 3f4f2d0..ca2a917 100644 --- a/Examples/SimpleRtspPlayer/SimpleRtspPlayer.csproj +++ b/Examples/SimpleRtspPlayer/SimpleRtspPlayer.csproj @@ -36,9 +36,24 @@ true + + ..\..\packages\CommonServiceLocator.2.0.2\lib\net45\CommonServiceLocator.dll + + + ..\..\packages\MvvmLightLibs.5.4.1\lib\net45\GalaSoft.MvvmLight.dll + + + ..\..\packages\MvvmLightLibs.5.4.1\lib\net45\GalaSoft.MvvmLight.Extras.dll + + + ..\..\packages\MvvmLightLibs.5.4.1\lib\net45\GalaSoft.MvvmLight.Platform.dll + + + ..\..\packages\MvvmLightLibs.5.4.1\lib\net45\System.Windows.Interactivity.dll + @@ -58,7 +73,6 @@ Designer - @@ -118,6 +132,7 @@ ResXFileCodeGenerator Resources.Designer.cs + SettingsSingleFileGenerator Settings.Designer.cs diff --git a/Examples/SimpleRtspPlayer/packages.config b/Examples/SimpleRtspPlayer/packages.config new file mode 100644 index 0000000..a1888b5 --- /dev/null +++ b/Examples/SimpleRtspPlayer/packages.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Examples/libffmpeghelper/videodecoding.cpp b/Examples/libffmpeghelper/videodecoding.cpp index fc25b61..9214bfe 100644 --- a/Examples/libffmpeghelper/videodecoding.cpp +++ b/Examples/libffmpeghelper/videodecoding.cpp @@ -25,12 +25,12 @@ int create_video_decoder(int codec_id, void **handle) if (!handle) return -1; - VideoDecoderContext *context = (VideoDecoderContext *)av_mallocz(sizeof(VideoDecoderContext)); + auto context = static_cast(av_mallocz(sizeof(VideoDecoderContext))); if (!context) return -2; - context->Codec = avcodec_find_decoder((AVCodecID)codec_id); + context->Codec = avcodec_find_decoder(static_cast(codec_id)); if (!context->Codec) { remove_video_decoder(context); @@ -44,7 +44,7 @@ int create_video_decoder(int codec_id, void **handle) return -4; } - if (avcodec_open2(context->AvCodecContext, context->Codec, NULL) < 0) + if (avcodec_open2(context->AvCodecContext, context->Codec, nullptr) < 0) { remove_video_decoder(context); return -5; @@ -70,12 +70,12 @@ int set_video_decoder_extradata(void *handle, void *extradata, int extradataLeng return -1; #endif - VideoDecoderContext *context = (VideoDecoderContext *)handle; + auto context = static_cast(handle); if (!context->AvCodecContext->extradata || context->AvCodecContext->extradata_size < extradataLength) { av_free(context->AvCodecContext->extradata); - context->AvCodecContext->extradata = (uint8_t*)av_malloc(extradataLength + AV_INPUT_BUFFER_PADDING_SIZE); + context->AvCodecContext->extradata = static_cast(av_malloc(extradataLength + AV_INPUT_BUFFER_PADDING_SIZE)); if (!context->AvCodecContext->extradata) return -2; @@ -87,7 +87,7 @@ int set_video_decoder_extradata(void *handle, void *extradata, int extradataLeng memset(context->AvCodecContext->extradata + extradataLength, 0, AV_INPUT_BUFFER_PADDING_SIZE); avcodec_close(context->AvCodecContext); - if (avcodec_open2(context->AvCodecContext, context->Codec, NULL) < 0) + if (avcodec_open2(context->AvCodecContext, context->Codec, nullptr) < 0) return -3; return 0; @@ -99,18 +99,18 @@ int decode_video_frame(void *handle, void *rawBuffer, int rawBufferLength, int * if (!handle || !rawBuffer || !rawBufferLength || !frameWidth || !frameHeight || !framePixelFormat) return -1; - if ((uintptr_t)rawBuffer % 4 != 0) + if (reinterpret_cast(rawBuffer) % 4 != 0) return -2; #endif - VideoDecoderContext *context = (VideoDecoderContext *)handle; + auto context = static_cast(handle); - context->AvRawPacket.data = (uint8_t *)rawBuffer; + context->AvRawPacket.data = static_cast(rawBuffer); context->AvRawPacket.size = rawBufferLength; int got_frame; - int len = avcodec_decode_video2(context->AvCodecContext, context->Frame, &got_frame, &context->AvRawPacket); + const int len = avcodec_decode_video2(context->AvCodecContext, context->Frame, &got_frame, &context->AvRawPacket); if (len != rawBufferLength) return -3; @@ -133,8 +133,8 @@ int scale_decoded_video_frame(void *handle, void *scalerHandle, void *scaledBuff return -1; #endif - VideoDecoderContext *context = (VideoDecoderContext *)handle; - ScalerContext *scalerContext = (ScalerContext *)scalerHandle; + auto context = static_cast(handle); + auto scalerContext = static_cast(scalerHandle); uint8_t *srcData[8]; @@ -145,22 +145,22 @@ int scale_decoded_video_frame(void *handle, void *scalerHandle, void *scaledBuff if (!sourceFmtDesc) return -4; - int x_shift = sourceFmtDesc->log2_chroma_w; - int y_shift = sourceFmtDesc->log2_chroma_h; + const int x_shift = sourceFmtDesc->log2_chroma_w; + const int y_shift = sourceFmtDesc->log2_chroma_h; srcData[0] = context->Frame->data[0] + scalerContext->SourceTop * context->Frame->linesize[0] + scalerContext->SourceLeft; srcData[1] = context->Frame->data[1] + (scalerContext->SourceTop >> y_shift) * context->Frame->linesize[1] + (scalerContext->SourceLeft >> x_shift); srcData[2] = context->Frame->data[2] + (scalerContext->SourceTop >> y_shift) * context->Frame->linesize[2] + (scalerContext->SourceLeft >> x_shift); - srcData[3] = 0; - srcData[4] = 0; - srcData[5] = 0; - srcData[6] = 0; - srcData[7] = 0; + srcData[3] = nullptr; + srcData[4] = nullptr; + srcData[5] = nullptr; + srcData[6] = nullptr; + srcData[7] = nullptr; } else memcpy(srcData, context->Frame->data, sizeof(srcData)); - sws_scale(scalerContext->SwsContext, srcData, context->Frame->linesize, 0, scalerContext->SourceHeight, (uint8_t **)&scaledBuffer, &scaledBufferStride); + sws_scale(scalerContext->SwsContext, srcData, context->Frame->linesize, 0, scalerContext->SourceHeight, reinterpret_cast(&scaledBuffer), &scaledBufferStride); return 0; } @@ -169,7 +169,7 @@ void remove_video_decoder(void *handle) if (!handle) return; - VideoDecoderContext *context = (VideoDecoderContext *)handle; + auto context = static_cast(handle); if (context->AvCodecContext) { @@ -188,16 +188,16 @@ int create_video_scaler(int sourceLeft, int sourceTop, int sourceWidth, int sour if (!handle) return -1; - ScalerContext *context = (ScalerContext *)av_mallocz(sizeof(ScalerContext)); + auto context = static_cast(av_mallocz(sizeof(ScalerContext))); if (!context) return -2; - AVPixelFormat sourceAvPixelFormat = static_cast(sourcePixelFormat); - AVPixelFormat scaledAvPixelFormat = static_cast(scaledPixelFormat); + const auto sourceAvPixelFormat = static_cast(sourcePixelFormat); + const auto scaledAvPixelFormat = static_cast(scaledPixelFormat); SwsContext *swsContext = sws_getContext(sourceWidth, sourceHeight, sourceAvPixelFormat, scaledWidth, scaledHeight, - scaledAvPixelFormat, quality, NULL, NULL, NULL); + scaledAvPixelFormat, quality, nullptr, nullptr, nullptr); if (!swsContext) { @@ -223,7 +223,7 @@ void remove_video_scaler(void *handle) if (!handle) return; - ScalerContext *context = (ScalerContext *)av_mallocz(sizeof(ScalerContext)); + auto context = static_cast(av_mallocz(sizeof(ScalerContext))); sws_freeContext(context->SwsContext); av_free(context); diff --git a/README.md b/README.md index 001bcf2..8607e06 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ This repo contains RTSP client implementation (called "RtspClientSharp") for .NET Standard 2.0 ## Features -- Supported transport protocols: TCP and HTTP (UDP will be added) +- Supported transport protocols: TCP/HTTP/UDP - Supported media codecs: H.264/MJPEG/AAC/G711A/G711U/PCM/G726 - No external dependencies, pure C# code - Asynchronous nature with cancellation tokens support @@ -39,6 +39,7 @@ Something like this: var serverUri = new Uri("rtsp://192.168.1.77:554/ucast/11"); var credentials = new NetworkCredential("admin", "123456"); var connectionParameters = new ConnectionParameters(serverUri, credentials); +connectionParameters.RtpTransport == RtpTransportProtocol.TCP; using(var rtspClient = new RtspClient(connectionParameters)) { rtspClient.FrameReceived += (sender, frame) => @@ -55,8 +56,10 @@ using(var rtspClient = new RtspClient(connectionParameters)) case RawG711UFrame g711UFrame: case RawPCMFrame pcmFrame: case RawG726Frame g726Frame: + break; } } + await rtspClient.ConnectAsync(token); await rtspClient.ReceiveAsync(token); } diff --git a/RtspClientSharp.UnitTests/Rtcp/RtcpReceiverReportsProviderTests.cs b/RtspClientSharp.UnitTests/Rtcp/RtcpReceiverReportsProviderTests.cs index 40fe2ce..b3b54b2 100644 --- a/RtspClientSharp.UnitTests/Rtcp/RtcpReceiverReportsProviderTests.cs +++ b/RtspClientSharp.UnitTests/Rtcp/RtcpReceiverReportsProviderTests.cs @@ -20,7 +20,7 @@ public void GetReportPackets_FakeDataProviders_ResetStateOfRtpStatisticsProvider var rtcpReportsProvider = new RtcpReceiverReportsProvider(rtpStatisticsProviderMock.Object, rtcpSenderStatisticsProviderFake.Object, 1); - rtcpReportsProvider.GetReportPackets(); + rtcpReportsProvider.GetReportPackets().ToList(); rtpStatisticsProviderMock.Verify(x => x.ResetState()); } @@ -45,7 +45,7 @@ public void GetReportPackets_TestDataProviders_ReturnsPacketWithValidReceiverRep var rtcpReportsProvider = new RtcpReceiverReportsProvider(rtpStatisticsProviderFake.Object, rtcpSenderStatisticsProviderFake.Object, 1112234); - IReadOnlyList packets = rtcpReportsProvider.GetReportPackets(); + IReadOnlyList packets = rtcpReportsProvider.GetReportPackets().ToList(); var receiverReportPacket = (RtcpReceiverReportPacket) packets.First(p => p is RtcpReceiverReportPacket); Assert.IsFalse(receiverReportPacket.PaddingFlag); @@ -71,7 +71,7 @@ public void GetReportPackets_TestDataProviders_ReturnsPacketWithValidSdesReport( var rtcpReportsProvider = new RtcpReceiverReportsProvider(rtpStatisticsProviderFake.Object, rtcpSenderStatisticsProviderFake.Object, 1112234); - IReadOnlyList packets = rtcpReportsProvider.GetReportPackets(); + IReadOnlyList packets = rtcpReportsProvider.GetReportPackets().ToList(); var sdesReportPacket = (RtcpSdesReportPacket) packets.First(p => p is RtcpSdesReportPacket); diff --git a/RtspClientSharp.UnitTests/Rtp/RtpSequenceAssemblerTests.cs b/RtspClientSharp.UnitTests/Rtp/RtpSequenceAssemblerTests.cs new file mode 100644 index 0000000..904516c --- /dev/null +++ b/RtspClientSharp.UnitTests/Rtp/RtpSequenceAssemblerTests.cs @@ -0,0 +1,166 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using RtspClientSharp.Rtp; + +namespace RtspClientSharp.UnitTests.Rtp +{ + [TestClass] + public class RtpSequenceAssemblerTests + { + private sealed class RtpPacketEqualityComparer : IEqualityComparer + { + public bool Equals(RtpPacket x, RtpPacket y) + { + return x.ProtocolVersion == y.ProtocolVersion && + x.PaddingFlag == y.PaddingFlag && + x.ExtensionFlag == y.ExtensionFlag && + x.CsrcCount == y.CsrcCount && + x.MarkerBit == y.MarkerBit && + x.PayloadType == y.PayloadType && + x.SeqNumber == y.SeqNumber && + x.Timestamp == y.Timestamp && + x.SyncSourceId == y.SyncSourceId && + x.ExtensionHeaderId == y.ExtensionHeaderId && + x.PayloadSegment.SequenceEqual(y.PayloadSegment); + } + + public int GetHashCode(RtpPacket obj) + { + unchecked + { + var hashCode = obj.ProtocolVersion; + hashCode = (hashCode * 397) ^ obj.PaddingFlag.GetHashCode(); + hashCode = (hashCode * 397) ^ obj.ExtensionFlag.GetHashCode(); + hashCode = (hashCode * 397) ^ obj.CsrcCount; + hashCode = (hashCode * 397) ^ obj.MarkerBit.GetHashCode(); + hashCode = (hashCode * 397) ^ obj.PayloadType; + hashCode = (hashCode * 397) ^ obj.SeqNumber.GetHashCode(); + hashCode = (hashCode * 397) ^ (int)obj.Timestamp; + hashCode = (hashCode * 397) ^ (int)obj.SyncSourceId; + hashCode = (hashCode * 397) ^ obj.ExtensionHeaderId; + hashCode = (hashCode * 397) ^ obj.PayloadSegment.GetHashCode(); + return hashCode; + } + } + } + + private static readonly Random Random = new Random(); + + [TestMethod] + public void ProcessPacket_SeveralPacketsWithValidOrder_PacketPassedImmediatelyCalled() + { + var testPayloadSegment = new ArraySegment(new byte[1024]); + int testPacketsCount = 10; + int countPassed = 0; + + var assembler = new RtpSequenceAssembler(1500, testPacketsCount); + assembler.PacketPassed += (ref RtpPacket packet) => ++countPassed; + for (ushort i = 0; i < testPacketsCount; i++) + { + var testPacket = new RtpPacket(i, testPayloadSegment); + assembler.ProcessPacket(ref testPacket); + } + + Assert.AreEqual(testPacketsCount, countPassed); + } + + [TestMethod] + [DataRow(2)] + [DataRow(4)] + [DataRow(8)] + [DataRow(16)] + [DataRow(32)] + [DataRow(1024)] + public void ProcessPacket_SeveralPacketsWithWrongOrder_SequenceIsRestored(int testPacketsCount) + { + RtpPacket packet; + List testPacketsList = CreateTestPacketsList(testPacketsCount); + var shuffledList = testPacketsList.ToList(); + ShuffleList(shuffledList); + var resultList = new List(); + + var assembler = new RtpSequenceAssembler(1500, testPacketsCount); + packet = new RtpPacket(ushort.MaxValue, Array.Empty()); + assembler.ProcessPacket(ref packet); + assembler.PacketPassed += (ref RtpPacket p) => resultList.Add(new RtpPacket(p.SeqNumber, p.PayloadSegment.ToArray())); + for (int i = 0; i < shuffledList.Count; i++) + { + packet = shuffledList[i]; + assembler.ProcessPacket(ref packet); + } + + Assert.IsTrue(testPacketsList.SequenceEqual(resultList, new RtpPacketEqualityComparer())); + } + + [TestMethod] + [DataRow(8, 1)] + [DataRow(16, 1)] + [DataRow(32, 1)] + [DataRow(1024, 1)] + [DataRow(8, 2)] + [DataRow(16, 2)] + [DataRow(32, 2)] + [DataRow(1024, 2)] + [DataRow(8, 4)] + [DataRow(16, 4)] + [DataRow(32, 4)] + [DataRow(1024, 4)] + public void ProcessPacket_OnePacketIsLost_SequenceIsOrdered(int testPacketsCount, int maxCorrectionLength) + { + RtpPacket packet; + List testPacketsList = CreateTestPacketsList(testPacketsCount); + int removeIndex = testPacketsCount / 2; + //remove one packet + testPacketsList.RemoveAt(removeIndex / 2); + List testPacketsLisWithShuffle = testPacketsList.ToList(); + //additionally swap next two packets + packet = testPacketsLisWithShuffle[removeIndex]; + testPacketsLisWithShuffle[removeIndex] = testPacketsLisWithShuffle[removeIndex + 1]; + testPacketsLisWithShuffle[removeIndex + 1] = packet; + + var resultList = new List(); + + var assembler = new RtpSequenceAssembler(1500, 8); + packet = new RtpPacket(ushort.MaxValue, Array.Empty()); + assembler.ProcessPacket(ref packet); + assembler.PacketPassed += (ref RtpPacket p) => resultList.Add(new RtpPacket(p.SeqNumber, p.PayloadSegment.ToArray())); + for (int i = 0; i < testPacketsLisWithShuffle.Count; i++) + { + packet = testPacketsLisWithShuffle[i]; + assembler.ProcessPacket(ref packet); + } + + Assert.IsTrue(testPacketsList.Take(resultList.Count).SequenceEqual(resultList, new RtpPacketEqualityComparer())); + } + + private static List CreateTestPacketsList(int testPacketsCount) + { + var testPacketsList = new List(); + + for (ushort i = 0; i < testPacketsCount; i++) + { + var randomBytes = new byte[Random.Next(100, 1500)]; + Random.NextBytes(randomBytes); + + testPacketsList.Add(new RtpPacket(i, randomBytes)); + } + + return testPacketsList; + } + + private static void ShuffleList(IList list) + { + int n = list.Count; + while (n > 1) + { + n--; + int k = Random.Next(n + 1); + T value = list[k]; + list[k] = list[n]; + list[n] = value; + } + } + } +} diff --git a/RtspClientSharp.UnitTests/Rtsp/RtspClientInternalTests.cs b/RtspClientSharp.UnitTests/Rtsp/RtspClientInternalTests.cs index b8811e0..3fd37f9 100644 --- a/RtspClientSharp.UnitTests/Rtsp/RtspClientInternalTests.cs +++ b/RtspClientSharp.UnitTests/Rtsp/RtspClientInternalTests.cs @@ -17,8 +17,8 @@ public async Task ConnectAsync_TestTransportClientThatEmulatesRtspServer_Connect { var transportClient = new RtspTransportClientEmulator(); - var rtspClient = new RtspClientInternal(_fakeConnectionParameters); - await rtspClient.ConnectAsync(transportClient, CancellationToken.None); + var rtspClient = new RtspClientInternal(_fakeConnectionParameters, () => transportClient); + await rtspClient.ConnectAsync(CancellationToken.None); } [TestMethod] @@ -29,31 +29,31 @@ public async Task ConnectAsync_CancellationRequested_ThrowsException() var cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.Cancel(); - var rtspClient = new RtspClientInternal(_fakeConnectionParameters); + var rtspClient = new RtspClientInternal(_fakeConnectionParameters, () => transportClient); - await rtspClient.ConnectAsync(transportClient, cancellationTokenSource.Token); + await rtspClient.ConnectAsync(cancellationTokenSource.Token); } [TestMethod] public async Task ReceiveAsync_InterleavedModeAndOneRtcpByePacketInStream_SuccessfullyFinished() { var transportClient = new RtspTransportClientEmulator(); - var rtspClient = new RtspClientInternal(_fakeConnectionParameters); + var rtspClient = new RtspClientInternal(_fakeConnectionParameters, () => transportClient); - await rtspClient.ConnectAsync(transportClient, CancellationToken.None); - await rtspClient.ReceiveAsync(transportClient, CancellationToken.None); + await rtspClient.ConnectAsync(CancellationToken.None); + await rtspClient.ReceiveAsync(CancellationToken.None); } [TestMethod] public async Task ReceiveAsync_CancellationRequested_ImmediateReturn() { var transportClient = new RtspTransportClientEmulator(); - var rtspClient = new RtspClientInternal(_fakeConnectionParameters); + var rtspClient = new RtspClientInternal(_fakeConnectionParameters, () => transportClient); var cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.Cancel(); - await rtspClient.ConnectAsync(transportClient, CancellationToken.None); - await rtspClient.ReceiveAsync(transportClient, cancellationTokenSource.Token); + await rtspClient.ConnectAsync(CancellationToken.None); + await rtspClient.ReceiveAsync(cancellationTokenSource.Token); } } } \ No newline at end of file diff --git a/RtspClientSharp.UnitTests/Rtsp/RtspRequestMessageFactoryTests.cs b/RtspClientSharp.UnitTests/Rtsp/RtspRequestMessageFactoryTests.cs index 052f645..363015e 100644 --- a/RtspClientSharp.UnitTests/Rtsp/RtspRequestMessageFactoryTests.cs +++ b/RtspClientSharp.UnitTests/Rtsp/RtspRequestMessageFactoryTests.cs @@ -47,7 +47,7 @@ public void CreateDescribeRequest_ValidProperties() } [TestMethod] - public void CreateSetupInterleavedRequest_ValidProperties() + public void CreateSetupTcpInterleavedRequest_ValidProperties() { const string testTrackName = "testTrack"; var factory = new RtspRequestMessageFactory(FakeUri, UserAgent); @@ -57,7 +57,25 @@ public void CreateSetupInterleavedRequest_ValidProperties() Assert.AreEqual(RtspMethod.SETUP, request.Method); Assert.AreEqual(FakeUri + "testTrack", request.ConnectionUri.ToString()); Assert.AreEqual(UserAgent, request.UserAgent); - Assert.IsTrue(request.Headers.Get("Transport").Contains($"{1}-{2}")); + string transportHeaderValue = request.Headers.Get("Transport"); + Assert.IsTrue(transportHeaderValue.Contains("TCP")); + Assert.IsTrue(transportHeaderValue.Contains($"{1}-{2}")); + } + + [TestMethod] + public void CreateSetupUdpUnicastRequest_ValidProperties() + { + const string testTrackName = "testTrack"; + var factory = new RtspRequestMessageFactory(FakeUri, UserAgent); + + RtspRequestMessage request = factory.CreateSetupUdpUnicastRequest(testTrackName, 1, 2); + + Assert.AreEqual(RtspMethod.SETUP, request.Method); + Assert.AreEqual(FakeUri + "testTrack", request.ConnectionUri.ToString()); + Assert.AreEqual(UserAgent, request.UserAgent); + string transportHeaderValue = request.Headers.Get("Transport"); + Assert.IsTrue(transportHeaderValue.Contains("unicast")); + Assert.IsTrue(transportHeaderValue.Contains($"{1}-{2}")); } [TestMethod] diff --git a/RtspClientSharp.UnitTests/Rtsp/RtspTransportClientEmulator.cs b/RtspClientSharp.UnitTests/Rtsp/RtspTransportClientEmulator.cs index 338e79d..08a695e 100644 --- a/RtspClientSharp.UnitTests/Rtsp/RtspTransportClientEmulator.cs +++ b/RtspClientSharp.UnitTests/Rtsp/RtspTransportClientEmulator.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Specialized; using System.IO; +using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -54,6 +55,8 @@ class RtspTransportClientEmulator : IRtspTransportClient private static readonly byte[] RtcpInterleavedByePacketsBytes = {TpktHeader.Id, 0x02, 0x00, 0x04, 0x80, 0xCB, 0x00, 0x00}; + public EndPoint RemoteEndPoint => new IPEndPoint(IPAddress.Loopback, 11080); + public virtual Task ConnectAsync(CancellationToken token) { return Task.CompletedTask; @@ -98,7 +101,7 @@ public Task ExecuteRequest(RtspRequestMessage requestMessag } } - public Task SendRequestAsync(RtspRequestMessage requestMessage) + public Task SendRequestAsync(RtspRequestMessage requestMessage, CancellationToken token) { return Task.CompletedTask; } diff --git a/RtspClientSharp.UnitTests/Rtsp/RtspTransportClientTests.cs b/RtspClientSharp.UnitTests/Rtsp/RtspTransportClientTests.cs index a01d8ca..8095a7b 100644 --- a/RtspClientSharp.UnitTests/Rtsp/RtspTransportClientTests.cs +++ b/RtspClientSharp.UnitTests/Rtsp/RtspTransportClientTests.cs @@ -1,5 +1,6 @@ using System; using System.IO; +using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -17,6 +18,8 @@ private class RtspTransportClientFake : RtspTransportClient private readonly MemoryStream _requestStream = new MemoryStream(); private MemoryStream _responseStream; + public override EndPoint RemoteEndPoint => new IPEndPoint(0, 0); + public RtspTransportClientFake(ConnectionParameters connectionParameters, Func responseProvider) : base(connectionParameters) diff --git a/RtspClientSharp.UnitTests/Tpkt/TpktStreamTests.cs b/RtspClientSharp.UnitTests/Tpkt/TpktStreamTests.cs index 91b2196..18be56c 100644 --- a/RtspClientSharp.UnitTests/Tpkt/TpktStreamTests.cs +++ b/RtspClientSharp.UnitTests/Tpkt/TpktStreamTests.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using RtspClientSharp.Tpkt; +using RtspClientSharp.Utils; namespace RtspClientSharp.UnitTests.Tpkt { diff --git a/RtspClientSharp.UnitTests/Utils/ArrayUtilsTests.cs b/RtspClientSharp.UnitTests/Utils/ArrayUtilsTests.cs index a744277..8faee9e 100644 --- a/RtspClientSharp.UnitTests/Utils/ArrayUtilsTests.cs +++ b/RtspClientSharp.UnitTests/Utils/ArrayUtilsTests.cs @@ -105,5 +105,27 @@ public void IndexOfBytes_PatternNotExists_ReturnsMinusOne() Assert.AreEqual(-1, index); } + + [TestMethod] + public void LastIndexOfBytes_PatternExists_ReturnsActualIndex() + { + var pattern = new Byte[] { 1, 2, 3 }; + var bytes = new byte[] { 0, 5, 6, 7, 1, 2, 3 }; + + int index = ArrayUtils.LastIndexOfBytes(bytes, pattern, 1, bytes.Length - 1); + + Assert.AreEqual(4, index); + } + + [TestMethod] + public void LastIndexOfBytes_PatternNotExists_ReturnsMinusOne() + { + var pattern = new Byte[] { 1, 2, 3 }; + var bytes = new byte[] { 0, 5, 6, 7, 1, 2 }; + + int index = ArrayUtils.LastIndexOfBytes(bytes, pattern, 1, bytes.Length - 1); + + Assert.AreEqual(-1, index); + } } } \ No newline at end of file diff --git a/RtspClientSharp.UnitTests/Utils/ChunksArrayTests.cs b/RtspClientSharp.UnitTests/Utils/ChunksArrayTests.cs new file mode 100644 index 0000000..7c713bb --- /dev/null +++ b/RtspClientSharp.UnitTests/Utils/ChunksArrayTests.cs @@ -0,0 +1,84 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using RtspClientSharp.Utils; + +namespace RtspClientSharp.UnitTests.Utils +{ + [TestClass] + public class ChunksArrayTests + { + [TestMethod] + public void Add_SeveralChunks_CanBeReadProperly() + { + const int testChunksCount = 100; + var chunkList = new List>(); + + for (int i = 0; i < testChunksCount; i++) + { + var chunkBytes = new byte[] {1, 2, 3, 4, 5, (byte) i}; + var chunkSegment = new ArraySegment(chunkBytes); + chunkList.Add(chunkSegment); + } + + var chunkArray = new ChunksArray(1500, testChunksCount); + + for (int i = 0; i < testChunksCount; i++) + chunkArray.Add(chunkList[i]); + + for (int i = 0; i < testChunksCount; i++) + Assert.IsTrue(chunkArray[i].SequenceEqual(chunkArray[i])); + } + + [TestMethod] + public void Add_NewChunk_CountShouldBeSetToOne() + { + var chunkBytes = new byte[] { 1, 2, 3, 4, 5 }; + var chunkSegment = new ArraySegment(chunkBytes); + var chunkArray = new ChunksArray(1500, 1); + + chunkArray.Add(chunkSegment); + + Assert.AreEqual(1, chunkArray.Count); + } + + [TestMethod] + public void Clear_SeveralTestChunks_EmptyArray() + { + int count = 10; + var chunkBytes = new byte[] { 1, 2, 3, 4, 5 }; + var chunkSegment = new ArraySegment(chunkBytes); + var chunkArray = new ChunksArray(1500, 10); + + for(int i = 0; i < count; i++) + chunkArray.Add(chunkSegment); + chunkArray.Clear(); + + Assert.AreEqual(0, chunkArray.Count); + } + + [TestMethod] + public void RemoveAt_RemoveMidpoint_ValidArray() + { + int testChunksCount = 100; + var chunkList = new List(); + var chunkArray = new ChunksArray(1500, testChunksCount); + for (int i = 0; i < testChunksCount; i++) + { + var chunkBytes = new byte[] { 1, 2, 3, 4, 5, (byte)i }; + var chunkSegment = new ArraySegment(chunkBytes); + chunkArray.Add(chunkSegment); + chunkList.Add(chunkBytes); + } + + int removeIndex = testChunksCount / 2; + chunkArray.RemoveAt(removeIndex); + chunkList.RemoveAt(removeIndex); + + Assert.AreEqual(--testChunksCount, chunkArray.Count); + for (int i = 0; i < testChunksCount; i++) + Assert.IsTrue(chunkList[i].SequenceEqual(chunkArray[i])); + } + } +} diff --git a/RtspClientSharp/ConnectionParameters.cs b/RtspClientSharp/ConnectionParameters.cs index 964817b..677f3db 100644 --- a/RtspClientSharp/ConnectionParameters.cs +++ b/RtspClientSharp/ConnectionParameters.cs @@ -5,8 +5,6 @@ namespace RtspClientSharp { public class ConnectionParameters { - private static readonly TimeSpan DefaultConnectTimeout = TimeSpan.FromSeconds(30); - private static readonly TimeSpan DefaultReceiveTimeout = TimeSpan.FromSeconds(10); private const string DefaultUserAgent = "RtspClientSharp"; /// @@ -14,15 +12,19 @@ public class ConnectionParameters /// and from "http://" for RTSP over HTTP tunneling /// public Uri ConnectionUri { get; } + /// /// Should be used to get only one video/audio track from device. /// Important notes: some devices won't connect in that case (SETUP request could not be processed) /// public RequiredTracks RequiredTracks { get; set; } = RequiredTracks.All; + public NetworkCredential Credentials { get; } - public TimeSpan ConnectTimeout { get; set; } = DefaultConnectTimeout; - public TimeSpan ReceiveTimeout { get; set; } = DefaultReceiveTimeout; + public TimeSpan ConnectTimeout { get; set; } = TimeSpan.FromSeconds(30); + public TimeSpan ReceiveTimeout { get; set; } = TimeSpan.FromSeconds(10); + public TimeSpan CancelTimeout { get; set; } = TimeSpan.FromSeconds(5); public string UserAgent { get; set; } = DefaultUserAgent; + public RtpTransportProtocol RtpTransport { get; set; } = RtpTransportProtocol.TCP; public ConnectionParameters(Uri connectionUri) { diff --git a/RtspClientSharp/MediaParsers/H264VideoPayloadParser.cs b/RtspClientSharp/MediaParsers/H264VideoPayloadParser.cs index bb1e453..1623f15 100644 --- a/RtspClientSharp/MediaParsers/H264VideoPayloadParser.cs +++ b/RtspClientSharp/MediaParsers/H264VideoPayloadParser.cs @@ -83,6 +83,7 @@ public override void ResetState() { _nalBuffer.ResetState(); _h264Parser.ResetState(); + _waitForStartFu = true; } private void ParseFU(TimeSpan timeOffset, ArraySegment byteSegment, int donFieldSize, bool markerBit) @@ -119,6 +120,8 @@ private void ParseFU(TimeSpan timeOffset, ArraySegment byteSegment, int do DateTime timestamp = GetFrameTimestamp(timeOffset); _h264Parser.GenerateFrame(timestamp); } + + _waitForStartFu = true; } else _waitForStartFu = false; diff --git a/RtspClientSharp/Rtcp/RtcpPacket.cs b/RtspClientSharp/Rtcp/RtcpPacket.cs index 89c8f51..a0fb3fc 100644 --- a/RtspClientSharp/Rtcp/RtcpPacket.cs +++ b/RtspClientSharp/Rtcp/RtcpPacket.cs @@ -48,6 +48,9 @@ public static IEnumerable Parse(ArraySegment byteSegment) int payloadLength = dwordLength * 4; + if(payloadLength > totalLength - 4) + throw new ArgumentException("Invalid RTCP packet size. It seems that data segment contains bad data", nameof(byteSegment)); + RtcpPacket packet; if (payloadType == 200) @@ -74,7 +77,7 @@ public static IEnumerable Parse(ArraySegment byteSegment) yield return packet; offset += payloadLength; - totalLength -= payloadLength; + totalLength -= 4 + payloadLength; } } } diff --git a/RtspClientSharp/Rtcp/RtcpReceiverReportsProvider.cs b/RtspClientSharp/Rtcp/RtcpReceiverReportsProvider.cs index b1fb24a..de97737 100644 --- a/RtspClientSharp/Rtcp/RtcpReceiverReportsProvider.cs +++ b/RtspClientSharp/Rtcp/RtcpReceiverReportsProvider.cs @@ -9,7 +9,6 @@ class RtcpReceiverReportsProvider private readonly IRtpStatisticsProvider _rtpStatisticsProvider; private readonly IRtcpSenderStatisticsProvider _rtcpSenderStatisticsProvider; private readonly uint _senderSyncSourceId; - private readonly List _reportPacketsList = new List(); public RtcpReceiverReportsProvider(IRtpStatisticsProvider rtpStatisticsProvider, IRtcpSenderStatisticsProvider rtcpSenderStatisticsProvider, uint senderSyncSourceId) @@ -22,16 +21,15 @@ public RtcpReceiverReportsProvider(IRtpStatisticsProvider rtpStatisticsProvider, _senderSyncSourceId = senderSyncSourceId; } - public IReadOnlyList GetReportPackets() + public IEnumerable GetReportPackets() { - _reportPacketsList.Clear(); - RtcpReceiverReportPacket receiverReport = CreateReceiverReport(); - _reportPacketsList.Add(receiverReport); + + yield return receiverReport; + RtcpSdesReportPacket sdesReport = CreateSdesReport(); - _reportPacketsList.Add(sdesReport); - return _reportPacketsList; + yield return sdesReport; } private RtcpReceiverReportPacket CreateReceiverReport() diff --git a/RtspClientSharp/Rtp/IRtpSequenceAssembler.cs b/RtspClientSharp/Rtp/IRtpSequenceAssembler.cs new file mode 100644 index 0000000..f1283dc --- /dev/null +++ b/RtspClientSharp/Rtp/IRtpSequenceAssembler.cs @@ -0,0 +1,11 @@ +using RtspClientSharp.Utils; + +namespace RtspClientSharp.Rtp +{ + internal interface IRtpSequenceAssembler + { + RefAction PacketPassed { get; set; } + + void ProcessPacket(ref RtpPacket rtpPacket); + } +} \ No newline at end of file diff --git a/RtspClientSharp/Rtp/RtpPacket.cs b/RtspClientSharp/Rtp/RtpPacket.cs index 77e60ed..b639090 100644 --- a/RtspClientSharp/Rtp/RtpPacket.cs +++ b/RtspClientSharp/Rtp/RtpPacket.cs @@ -20,8 +20,23 @@ struct RtpPacket public uint SyncSourceId { get; private set; } public int ExtensionHeaderId { get; private set; } - public ArraySegment ExtensionHeaderSegment { get; private set; } - public ArraySegment PayloadSegment { get; private set; } + public ArraySegment PayloadSegment { get; set; } + + internal RtpPacket(ushort seqNumber, ArraySegment payloadSegment) + { + ProtocolVersion = 1; + PaddingFlag = false; + ExtensionFlag = false; + CsrcCount = 0; + MarkerBit = false; + PayloadType = 0; + SeqNumber = 0; + Timestamp = 0; + SyncSourceId = 0; + ExtensionHeaderId = 0; + SeqNumber = seqNumber; + PayloadSegment = payloadSegment; + } public static bool TryParse(ArraySegment byteSegment, out RtpPacket rtpPacket) { @@ -60,11 +75,7 @@ public static bool TryParse(ArraySegment byteSegment, out RtpPacket rtpPac offset += 2; int extensionHeaderLength = BigEndianConverter.ReadUInt16(byteSegment.Array, offset) * 4; - offset += 2; - - rtpPacket.ExtensionHeaderSegment = - new ArraySegment(byteSegment.Array, offset, extensionHeaderLength); - offset += extensionHeaderLength; + offset += 2 + extensionHeaderLength; } int payloadSize = byteSegment.Offset + byteSegment.Count - offset; diff --git a/RtspClientSharp/Rtp/RtpSequenceAssembler.cs b/RtspClientSharp/Rtp/RtpSequenceAssembler.cs new file mode 100644 index 0000000..408890b --- /dev/null +++ b/RtspClientSharp/Rtp/RtpSequenceAssembler.cs @@ -0,0 +1,169 @@ +using System; +using System.Collections.Generic; +using RtspClientSharp.Utils; + +namespace RtspClientSharp.Rtp +{ + class RtpSequenceAssembler : IRtpSequenceAssembler + { + private readonly ChunksArray _chunksArray; + private readonly int _maxCorrectionLength; + private ushort _previousCorrectSeqNumber; + private readonly List _bufferedRtpPackets; + private readonly List _removeList; + private bool _isFirstPacket = true; + + public RefAction PacketPassed { get; set; } + + public RtpSequenceAssembler(int maxRtpPacketSize, int maxCorrectionLength) + { + if (maxRtpPacketSize <= 0) + throw new ArgumentOutOfRangeException(nameof(maxRtpPacketSize)); + if (maxCorrectionLength < 1) + throw new ArgumentOutOfRangeException(nameof(maxCorrectionLength)); + + _maxCorrectionLength = maxCorrectionLength; + _chunksArray = new ChunksArray(maxRtpPacketSize, maxCorrectionLength); + + _bufferedRtpPackets = new List(maxCorrectionLength); + _removeList = new List(maxCorrectionLength); + } + + public void ProcessPacket(ref RtpPacket rtpPacket) + { + if (_isFirstPacket) + { + _previousCorrectSeqNumber = rtpPacket.SeqNumber; + PacketPassed?.Invoke(ref rtpPacket); + _isFirstPacket = false; + return; + } + + int delta = (ushort) (rtpPacket.SeqNumber - _previousCorrectSeqNumber); + + if (delta == 1) + { + _previousCorrectSeqNumber = rtpPacket.SeqNumber; + PacketPassed?.Invoke(ref rtpPacket); + + if (_bufferedRtpPackets.Count == 0) + return; + + ushort nextSeqNumber = (ushort) (_previousCorrectSeqNumber + 1); + ProcessBufferedPackets(nextSeqNumber); + return; + } + + if (delta > _maxCorrectionLength) + { + while (_bufferedRtpPackets.Count != 0) + PassNearestBufferedPacket(); + + _previousCorrectSeqNumber = rtpPacket.SeqNumber; + PacketPassed?.Invoke(ref rtpPacket); + } + else + { + if (rtpPacket.SeqNumber == _previousCorrectSeqNumber) + return; + + _bufferedRtpPackets.Add(rtpPacket); + _chunksArray.Add(rtpPacket.PayloadSegment); + + if (_bufferedRtpPackets.Count != _maxCorrectionLength) + return; + + PassNearestBufferedPacket(); + + ushort nextSeqNumber = (ushort) (_previousCorrectSeqNumber + 1); + ProcessBufferedPackets(nextSeqNumber); + } + } + + private void PassNearestBufferedPacket() + { + int nearestIndex = 0; + int nearestDelta = (ushort) (_bufferedRtpPackets[0].SeqNumber - _previousCorrectSeqNumber); + + for (int i = 1; i < _bufferedRtpPackets.Count; i++) + { + int delta = (ushort) (_bufferedRtpPackets[i].SeqNumber - _previousCorrectSeqNumber); + + if (delta < nearestDelta) + { + nearestIndex = i; + nearestDelta = delta; + } + } + + RtpPacket nearestRtpPacket = _bufferedRtpPackets[nearestIndex]; + nearestRtpPacket.PayloadSegment = _chunksArray[nearestIndex]; + + _previousCorrectSeqNumber = _bufferedRtpPackets[nearestIndex].SeqNumber; + PacketPassed?.Invoke(ref nearestRtpPacket); + + _bufferedRtpPackets.RemoveAt(nearestIndex); + _chunksArray.RemoveAt(nearestIndex); + } + + private void ProcessBufferedPackets(ushort nextSeqNumber) + { + while (true) + { + bool anythingFound = false; + + for (int i = 0; i < _bufferedRtpPackets.Count; i++) + { + if (_bufferedRtpPackets[i].SeqNumber != nextSeqNumber) + continue; + + RtpPacket nextRtpPacket = _bufferedRtpPackets[i]; + nextRtpPacket.PayloadSegment = _chunksArray[i]; + + _previousCorrectSeqNumber = nextRtpPacket.SeqNumber; + PacketPassed?.Invoke(ref nextRtpPacket); + + ++nextSeqNumber; + anythingFound = true; + _removeList.Add(i); + } + + if (!anythingFound) + break; + } + + if (_removeList.Count == 0) + return; + + if (_removeList.Count == _bufferedRtpPackets.Count) + { + _bufferedRtpPackets.Clear(); + _chunksArray.Clear(); + } + else + { + if (_removeList.Count == 1) + { + int removeIndex = _removeList[0]; + + _bufferedRtpPackets.RemoveAt(removeIndex); + _chunksArray.RemoveAt(removeIndex); + } + else + { + _removeList.Sort(); + + for (int i = _removeList.Count - 1; i > -1; i--) + { + int removeIndex = _removeList[i]; + + _bufferedRtpPackets.RemoveAt(removeIndex); + _chunksArray.RemoveAt(removeIndex); + } + } + } + + _removeList.Clear(); + } + } +} \ No newline at end of file diff --git a/RtspClientSharp/Rtp/RtpStream.cs b/RtspClientSharp/Rtp/RtpStream.cs index 2f4fd58..675464c 100644 --- a/RtspClientSharp/Rtp/RtpStream.cs +++ b/RtspClientSharp/Rtp/RtpStream.cs @@ -1,10 +1,12 @@ using System; +using System.Diagnostics; using RtspClientSharp.MediaParsers; namespace RtspClientSharp.Rtp { class RtpStream : ITransportStream, IRtpStatisticsProvider { + private readonly IRtpSequenceAssembler _rtpSequenceAssembler; private readonly IMediaPayloadParser _mediaPayloadParser; private readonly int _samplesFrequency; @@ -20,10 +22,17 @@ class RtpStream : ITransportStream, IRtpStatisticsProvider public uint CumulativePacketLost { get; private set; } public ushort SequenceCycles { get; private set; } - public RtpStream(IMediaPayloadParser mediaPayloadParser, int samplesFrequency) + public RtpStream(IMediaPayloadParser mediaPayloadParser, int samplesFrequency, + IRtpSequenceAssembler rtpSequenceAssembler = null) { _mediaPayloadParser = mediaPayloadParser ?? throw new ArgumentNullException(nameof(mediaPayloadParser)); _samplesFrequency = samplesFrequency; + + if (rtpSequenceAssembler != null) + { + _rtpSequenceAssembler = rtpSequenceAssembler; + _rtpSequenceAssembler.PacketPassed += ProcessImmediately; + } } public void Process(ArraySegment payloadSegment) @@ -31,6 +40,14 @@ public void Process(ArraySegment payloadSegment) if (!RtpPacket.TryParse(payloadSegment, out RtpPacket rtpPacket)) return; + if (_rtpSequenceAssembler != null) + _rtpSequenceAssembler.ProcessPacket(ref rtpPacket); + else + ProcessImmediately(ref rtpPacket); + } + + private void ProcessImmediately(ref RtpPacket rtpPacket) + { SyncSourceId = rtpPacket.SyncSourceId; if (!_isFirstPacket) @@ -67,7 +84,7 @@ public void Process(ArraySegment payloadSegment) _previousSeqNumber = rtpPacket.SeqNumber; _previousTimestamp = rtpPacket.Timestamp; - if (payloadSegment.Count == 0) + if (rtpPacket.PayloadSegment.Count == 0) return; TimeSpan timeOffset = _samplesFrequency != 0 diff --git a/RtspClientSharp/RtpTransportProtocol.cs b/RtspClientSharp/RtpTransportProtocol.cs new file mode 100644 index 0000000..c947f68 --- /dev/null +++ b/RtspClientSharp/RtpTransportProtocol.cs @@ -0,0 +1,8 @@ +namespace RtspClientSharp +{ + public enum RtpTransportProtocol + { + TCP, + UDP + } +} \ No newline at end of file diff --git a/RtspClientSharp/Rtsp/Constants.cs b/RtspClientSharp/Rtsp/Constants.cs index 6ac22c4..4fc288d 100644 --- a/RtspClientSharp/Rtsp/Constants.cs +++ b/RtspClientSharp/Rtsp/Constants.cs @@ -7,5 +7,6 @@ static class Constants public static readonly byte[] RtspProtocolNameBytes = {(byte) 'R', (byte) 'T', (byte) 'S', (byte) 'P'}; public const int MaxResponseHeadersSize = 8 * 1024; public static readonly byte[] DoubleCrlfBytes = {(byte) '\r', (byte) '\n', (byte) '\r', (byte) '\n'}; + public const int UdpReceiveBufferSize = 2048; } } \ No newline at end of file diff --git a/RtspClientSharp/Rtsp/IRtspTransportClient.cs b/RtspClientSharp/Rtsp/IRtspTransportClient.cs index 3d927c0..b461f8b 100644 --- a/RtspClientSharp/Rtsp/IRtspTransportClient.cs +++ b/RtspClientSharp/Rtsp/IRtspTransportClient.cs @@ -1,5 +1,6 @@ using System; using System.IO; +using System.Net; using System.Threading; using System.Threading.Tasks; @@ -7,6 +8,8 @@ namespace RtspClientSharp.Rtsp { internal interface IRtspTransportClient : IDisposable { + EndPoint RemoteEndPoint { get; } + Task ConnectAsync(CancellationToken token); Stream GetStream(); @@ -17,6 +20,6 @@ Task EnsureExecuteRequest(RtspRequestMessage requestMessage Task ExecuteRequest(RtspRequestMessage requestMessage, CancellationToken token, int responseReadPortionSize = 0); - Task SendRequestAsync(RtspRequestMessage requestMessage); + Task SendRequestAsync(RtspRequestMessage requestMessage, CancellationToken token); } } \ No newline at end of file diff --git a/RtspClientSharp/Rtsp/RtspClientInternal.cs b/RtspClientSharp/Rtsp/RtspClientInternal.cs index 9361012..0829027 100644 --- a/RtspClientSharp/Rtsp/RtspClientInternal.cs +++ b/RtspClientSharp/Rtsp/RtspClientInternal.cs @@ -1,7 +1,10 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; +using System.Net; +using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using RtspClientSharp.Codecs.Audio; @@ -16,51 +19,63 @@ namespace RtspClientSharp.Rtsp { - class RtspClientInternal + sealed class RtspClientInternal : IDisposable { private const int RtcpReportIntervalBaseMs = 3000; private readonly ConnectionParameters _connectionParameters; - + private readonly Func _transportClientProvider; private readonly RtspRequestMessageFactory _requestMessageFactory; private readonly Dictionary _streamsMap = new Dictionary(); + private readonly ConcurrentDictionary _udpClientsMap = new ConcurrentDictionary(); private readonly Dictionary _reportProvidersMap = new Dictionary(); + private TpktStream _tpktStream; + + private readonly SimpleHybridLock _hybridLock = new SimpleHybridLock(); + private readonly MemoryStream _rtcpPacketsStream = new MemoryStream(); private readonly Random _random = RandomGeneratorFactory.CreateGenerator(); + private IRtspTransportClient _rtspTransportClient; private int _rtspKeepAliveTimeoutMs; - private bool _isConnectionClosedByServer; + private readonly CancellationTokenSource _serverCancellationTokenSource = new CancellationTokenSource(); private bool _isServerSupportsGetParameterRequest; + private int _disposed; - public Action ReadingContinues; public Action FrameReceived; - public RtspClientInternal(ConnectionParameters connectionParameters) + public RtspClientInternal(ConnectionParameters connectionParameters, + Func transportClientProvider = null) { - _connectionParameters = connectionParameters; + _connectionParameters = + connectionParameters ?? throw new ArgumentNullException(nameof(connectionParameters)); + _transportClientProvider = transportClientProvider ?? CreateTransportClient; Uri fixedRtspUri = connectionParameters.GetFixedRtspUri(); _requestMessageFactory = new RtspRequestMessageFactory(fixedRtspUri, connectionParameters.UserAgent); } - public async Task ConnectAsync(IRtspTransportClient rtspTransportClient, CancellationToken token) + public async Task ConnectAsync(CancellationToken token) { - ResetState(); + IRtspTransportClient rtspTransportClient = _transportClientProvider(); + Volatile.Write(ref _rtspTransportClient, rtspTransportClient); + + await _rtspTransportClient.ConnectAsync(token); RtspRequestMessage optionsRequest = _requestMessageFactory.CreateOptionsRequest(); - RtspResponseMessage optionsResponse = await rtspTransportClient.ExecuteRequest(optionsRequest, token); + RtspResponseMessage optionsResponse = await _rtspTransportClient.ExecuteRequest(optionsRequest, token); if (optionsResponse.StatusCode == RtspStatusCode.Ok) ParsePublicHeader(optionsResponse.Headers[WellKnownHeaders.Public]); RtspRequestMessage describeRequest = _requestMessageFactory.CreateDescribeRequest(); RtspResponseMessage describeResponse = - await rtspTransportClient.EnsureExecuteRequest(describeRequest, token); + await _rtspTransportClient.EnsureExecuteRequest(describeRequest, token); string contentBaseHeader = describeResponse.Headers[WellKnownHeaders.ContentBase]; @@ -70,113 +85,266 @@ public async Task ConnectAsync(IRtspTransportClient rtspTransportClient, Cancell var parser = new SdpParser(); IEnumerable tracks = parser.Parse(describeResponse.ResponseBody); - int channelCounter = 0; - uint senderSyncSourceId = (uint) _random.Next(); - bool anyTrackRequested = false; foreach (RtspMediaTrackInfo track in GetTracksToSetup(tracks)) { - int rtpChannelNumber = ++channelCounter; - int rtcpChannelNumber = ++channelCounter; + await SetupTrackAsync(track, token); + anyTrackRequested = true; + } + + if (!anyTrackRequested) + throw new RtspClientException("Any suitable track is not found"); - RtspRequestMessage setupRequest = _requestMessageFactory.CreateSetupTcpInterleavedRequest( - track.TrackName, rtpChannelNumber, rtcpChannelNumber); + RtspRequestMessage playRequest = _requestMessageFactory.CreatePlayRequest(); + await _rtspTransportClient.EnsureExecuteRequest(playRequest, token, 1); + } - RtspResponseMessage setupResponse = await rtspTransportClient.EnsureExecuteRequest(setupRequest, token); + public async Task ReceiveAsync(CancellationToken token) + { + if (_rtspTransportClient == null) + throw new InvalidOperationException("Client should be connected first"); - ParseSessionHeader(setupResponse.Headers[WellKnownHeaders.Session]); + TimeSpan nextRtspKeepAliveInterval = GetNextRtspKeepAliveInterval(); + TimeSpan nextRtcpReportInterval = GetNextRtcpReportInterval(); - IMediaPayloadParser mediaPayloadParser = MediaPayloadParser.CreateFrom(track.Codec); - mediaPayloadParser.FrameGenerated = FrameReceived; + using (var linkedTokenSource = + CancellationTokenSource.CreateLinkedTokenSource(_serverCancellationTokenSource.Token, token)) + { + CancellationToken linkedToken = linkedTokenSource.Token; - var rtpStream = new RtpStream(mediaPayloadParser, track.SamplesFrequency); - _streamsMap.Add(rtpChannelNumber, rtpStream); + Task receiveTask = _connectionParameters.RtpTransport == RtpTransportProtocol.TCP + ? ReceiveOverTcpAsync(_rtspTransportClient.GetStream(), linkedToken) + : ReceiveOverUdpAsync(linkedToken); - var rtcpStream = new RtcpStream(); - rtcpStream.SessionShutdown += (sender, args) => _isConnectionClosedByServer = true; - _streamsMap.Add(rtcpChannelNumber, rtcpStream); + Task rtcpReportDelayTask = Task.Delay(nextRtcpReportInterval, linkedToken); + Task rtspKeepAliveDelayTask = null; - var rtcpReportsProvider = new RtcpReceiverReportsProvider(rtpStream, rtcpStream, senderSyncSourceId); - _reportProvidersMap.Add(rtcpChannelNumber, rtcpReportsProvider); + if (_isServerSupportsGetParameterRequest) + rtspKeepAliveDelayTask = Task.Delay(nextRtspKeepAliveInterval, linkedToken); - anyTrackRequested = true; + while (true) + { + Task result; + + if (_isServerSupportsGetParameterRequest) + result = await Task.WhenAny(receiveTask, rtcpReportDelayTask, rtspKeepAliveDelayTask); + else + result = await Task.WhenAny(receiveTask, rtcpReportDelayTask); + + if (result == receiveTask) + { + await receiveTask; + break; + } + + if (result.IsCanceled) + break; + + if (result == rtcpReportDelayTask) + { + nextRtcpReportInterval = GetNextRtcpReportInterval(); + rtcpReportDelayTask = Task.Delay(nextRtcpReportInterval, linkedToken); + + await SendRtcpReportsAsync(linkedToken); + } + else + { + nextRtspKeepAliveInterval = GetNextRtspKeepAliveInterval(); + rtspKeepAliveDelayTask = Task.Delay(nextRtspKeepAliveInterval, linkedToken); + + await SendRtspKeepAliveAsync(linkedToken); + } + } + + if (!receiveTask.IsCompleted) + await receiveTask; + + if (linkedToken.IsCancellationRequested) + await CloseRtspSessionAsync(CancellationToken.None); } - if (!anyTrackRequested) - throw new RtspClientException("Any suitable track is not found"); + } - RtspRequestMessage playRequest = _requestMessageFactory.CreatePlayRequest(); - await rtspTransportClient.EnsureExecuteRequest(playRequest, token, 1); + public void Dispose() + { + if (Interlocked.CompareExchange(ref _disposed, 1, 0) != 0) + return; + + if (_udpClientsMap.Count != 0) + foreach (Socket client in _udpClientsMap.Values) + client.Close(); + + IRtspTransportClient rtspTransportClient = Volatile.Read(ref _rtspTransportClient); + + if (rtspTransportClient != null) + _rtspTransportClient.Dispose(); } - public async Task ReceiveAsync(IRtspTransportClient rtspTransportClient, CancellationToken token) + private IRtspTransportClient CreateTransportClient() { - var tpktStream = new TpktStream(rtspTransportClient.GetStream()); + if (_connectionParameters.ConnectionUri.Scheme.Equals(Uri.UriSchemeHttp, + StringComparison.InvariantCultureIgnoreCase)) + return new RtspHttpTransportClient(_connectionParameters); - int nextKeepAliveIntervalMs = GetNextRtspKeepAliveInterval(); - int rtcpReportIntervalMs = GetNextRtcpReportInterval(); + return new RtspTcpTransportClient(_connectionParameters); + } - int ticksNow = Environment.TickCount; - int lastTimeRtspKeepAliveSent = ticksNow; - int lastTimeRtcpReportsPrepared = ticksNow; + private TimeSpan GetNextRtspKeepAliveInterval() + { + return TimeSpan.FromMilliseconds(_random.Next(_rtspKeepAliveTimeoutMs / 2, + _rtspKeepAliveTimeoutMs * 3 / 4)); + } - while (!_isConnectionClosedByServer && !token.IsCancellationRequested) - { - TpktPayload payload = await tpktStream.ReadAsync(); + private TimeSpan GetNextRtcpReportInterval() + { + return TimeSpan.FromMilliseconds(RtcpReportIntervalBaseMs + _random.Next(0, 11) * 100); + } - if (_streamsMap.TryGetValue(payload.Channel, out ITransportStream channel)) - channel.Process(payload.PayloadSegment); + private async Task SetupTrackAsync(RtspMediaTrackInfo track, CancellationToken token) + { + RtspRequestMessage setupRequest; + RtspResponseMessage setupResponse; - ReadingContinues?.Invoke(); + int rtpChannelNumber; + int rtcpChannelNumber; - ticksNow = Environment.TickCount; + if (_connectionParameters.RtpTransport == RtpTransportProtocol.UDP) + { + Socket rtpClient = NetworkClientFactory.CreateUdpClient(); + Socket rtcpClient = NetworkClientFactory.CreateUdpClient(); - if (TimeUtils.IsTimeOver(ticksNow, lastTimeRtcpReportsPrepared, rtcpReportIntervalMs)) + try { - lastTimeRtcpReportsPrepared = ticksNow; - rtcpReportIntervalMs = GetNextRtcpReportInterval(); + IPEndPoint endPoint = new IPEndPoint(IPAddress.Any, 0); + rtpClient.Bind(endPoint); + + rtpChannelNumber = ((IPEndPoint)rtpClient.LocalEndPoint).Port; + + endPoint = new IPEndPoint(IPAddress.Any, rtpChannelNumber + 1); - foreach (KeyValuePair pair in _reportProvidersMap) + try { - IReadOnlyList packets = pair.Value.GetReportPackets(); + rtcpClient.Bind(endPoint); + } + catch (SocketException e) when (e.SocketErrorCode == SocketError.AddressAlreadyInUse) + { + endPoint = new IPEndPoint(IPAddress.Any, 0); + rtcpClient.Bind(endPoint); + } - _rtcpPacketsStream.Position = 0; + rtcpChannelNumber = ((IPEndPoint)rtcpClient.LocalEndPoint).Port; - foreach (ISerializablePacket report in packets.Cast()) - report.Serialize(_rtcpPacketsStream); + _udpClientsMap[rtpChannelNumber] = rtpClient; + _udpClientsMap[rtcpChannelNumber] = rtcpClient; - var byteSegment = new ArraySegment(_rtcpPacketsStream.GetBuffer(), 0, - (int) _rtcpPacketsStream.Position); + setupRequest = + _requestMessageFactory.CreateSetupUdpUnicastRequest(track.TrackName, rtpChannelNumber, + rtcpChannelNumber); + setupResponse = await _rtspTransportClient.EnsureExecuteRequest(setupRequest, token); - await tpktStream.WriteAsync(pair.Key, byteSegment); - } - } + string transportHeader = setupResponse.Headers[WellKnownHeaders.Transport]; - if (_isServerSupportsGetParameterRequest && - TimeUtils.IsTimeOver(ticksNow, lastTimeRtspKeepAliveSent, nextKeepAliveIntervalMs)) - { - lastTimeRtspKeepAliveSent = ticksNow; - nextKeepAliveIntervalMs = GetNextRtspKeepAliveInterval(); + if (string.IsNullOrEmpty(transportHeader)) + throw new RtspBadResponseException("Transport header is not found"); + + if (!ParseSeverPorts(transportHeader, out var serverRtpPort, out var serverRtcpPort)) + throw new RtspBadResponseException("Server ports are not found"); - RtspRequestMessage getParameterRequest = _requestMessageFactory.CreateGetParameterRequest(); - await rtspTransportClient.SendRequestAsync(getParameterRequest); + IPEndPoint remoteEndPoint = (IPEndPoint)_rtspTransportClient.RemoteEndPoint; + + rtpClient.Connect(new IPEndPoint(remoteEndPoint.Address, serverRtpPort)); + rtcpClient.Connect(new IPEndPoint(remoteEndPoint.Address, serverRtcpPort)); } + catch + { + rtpClient.Close(); + rtcpClient.Close(); + throw; + } + } + else + { + int channelCounter = _streamsMap.Count; + rtpChannelNumber = ++channelCounter; + rtcpChannelNumber = ++channelCounter; + + setupRequest = + _requestMessageFactory.CreateSetupTcpInterleavedRequest(track.TrackName, rtpChannelNumber, + rtcpChannelNumber); + setupResponse = await _rtspTransportClient.EnsureExecuteRequest(setupRequest, token); } - if (token.IsCancellationRequested) + ParseSessionHeader(setupResponse.Headers[WellKnownHeaders.Session]); + + IMediaPayloadParser mediaPayloadParser = MediaPayloadParser.CreateFrom(track.Codec); + + IRtpSequenceAssembler rtpSequenceAssembler; + + if (_connectionParameters.RtpTransport == RtpTransportProtocol.TCP) { - RtspRequestMessage teardownRequest = _requestMessageFactory.CreateTeardownRequest(); - await rtspTransportClient.SendRequestAsync(teardownRequest); + rtpSequenceAssembler = null; + mediaPayloadParser.FrameGenerated = OnFrameGeneratedLockfree; } + else + { + rtpSequenceAssembler = new RtpSequenceAssembler(Constants.UdpReceiveBufferSize, 8); + mediaPayloadParser.FrameGenerated = OnFrameGeneratedThreadSafe; + } + + var rtpStream = new RtpStream(mediaPayloadParser, track.SamplesFrequency, rtpSequenceAssembler); + _streamsMap.Add(rtpChannelNumber, rtpStream); + + var rtcpStream = new RtcpStream(); + rtcpStream.SessionShutdown += (sender, args) => _serverCancellationTokenSource.Cancel(); + _streamsMap.Add(rtcpChannelNumber, rtcpStream); + + uint senderSyncSourceId = (uint)_random.Next(); + + var rtcpReportsProvider = new RtcpReceiverReportsProvider(rtpStream, rtcpStream, senderSyncSourceId); + _reportProvidersMap.Add(rtcpChannelNumber, rtcpReportsProvider); } - private void ResetState() + private async Task SendRtcpReportsAsync(CancellationToken token) { - _requestMessageFactory.ResetState(); - _isConnectionClosedByServer = false; - _isServerSupportsGetParameterRequest = false; - _streamsMap.Clear(); - _reportProvidersMap.Clear(); + foreach (KeyValuePair pair in _reportProvidersMap) + { + token.ThrowIfCancellationRequested(); + + IEnumerable packets = pair.Value.GetReportPackets(); + + _rtcpPacketsStream.Position = 0; + + foreach (ISerializablePacket report in packets.Cast()) + report.Serialize(_rtcpPacketsStream); + + byte[] streamBuffer = _rtcpPacketsStream.GetBuffer(); + var byteSegment = new ArraySegment(streamBuffer, 0, (int)_rtcpPacketsStream.Position); + + if (_connectionParameters.RtpTransport == RtpTransportProtocol.TCP) + await _tpktStream.WriteAsync(pair.Key, byteSegment); + else + await _udpClientsMap[pair.Key].SendAsync(byteSegment, SocketFlags.None); + } + } + + private async Task SendRtspKeepAliveAsync(CancellationToken token) + { + RtspRequestMessage getParameterRequest = _requestMessageFactory.CreateGetParameterRequest(); + + if (_connectionParameters.RtpTransport == RtpTransportProtocol.TCP) + await _rtspTransportClient.SendRequestAsync(getParameterRequest, token); + else + await _rtspTransportClient.EnsureExecuteRequest(getParameterRequest, token); + } + + private async Task CloseRtspSessionAsync(CancellationToken token) + { + RtspRequestMessage teardownRequest = _requestMessageFactory.CreateTeardownRequest(); + + if (_connectionParameters.RtpTransport == RtpTransportProtocol.TCP) + await _rtspTransportClient.SendRequestAsync(teardownRequest, token); + else + await _rtspTransportClient.EnsureExecuteRequest(teardownRequest, token); } private IEnumerable GetTracksToSetup(IEnumerable tracks) @@ -190,7 +358,6 @@ private IEnumerable GetTracksToSetup(IEnumerable(); + + foreach (KeyValuePair pair in _udpClientsMap) + { + int channelNumber = pair.Key; + ITransportStream transportStream = _streamsMap[channelNumber]; + + Task receiveTask = ReceiveFromUdpChannelAsync(pair.Value, transportStream, token); + + if (transportStream is RtpStream) + waitList.Add(receiveTask); + } + + return Task.WhenAll(waitList); + } + + private async Task ReceiveFromUdpChannelAsync(Socket client, ITransportStream transportStream, + CancellationToken token) + { + var readBuffer = new byte[Constants.UdpReceiveBufferSize]; + var bufferSegment = new ArraySegment(readBuffer); + + while (!token.IsCancellationRequested) + { + int read = await client.ReceiveAsync(bufferSegment, SocketFlags.None); + + var payloadSegment = new ArraySegment(readBuffer, 0, read); + transportStream.Process(payloadSegment); + } + } } } \ No newline at end of file diff --git a/RtspClientSharp/Rtsp/RtspHttpTransportClient.cs b/RtspClientSharp/Rtsp/RtspHttpTransportClient.cs index 24b7693..88417c8 100644 --- a/RtspClientSharp/Rtsp/RtspHttpTransportClient.cs +++ b/RtspClientSharp/Rtsp/RtspHttpTransportClient.cs @@ -15,12 +15,16 @@ namespace RtspClientSharp.Rtsp { class RtspHttpTransportClient : RtspTransportClient { - private TcpClient _streamDataClient; - private TcpClient _commandsClient; + private Socket _streamDataClient; + private Socket _commandsClient; private string _sessionCookie; private Authenticator _authenticator; private Stream _dataNetworkStream; private uint _commandCounter; + private EndPoint _remoteEndPoint = new IPEndPoint(IPAddress.None, 0); + private int _disposed; + + public override EndPoint RemoteEndPoint => _remoteEndPoint; public RtspHttpTransportClient(ConnectionParameters connectionParameters) : base(connectionParameters) @@ -32,7 +36,7 @@ public override async Task ConnectAsync(CancellationToken token) _commandCounter = 0; _sessionCookie = Guid.NewGuid().ToString("N").Substring(0, 10); - _streamDataClient = TcpClientFactory.Create(); + _streamDataClient = NetworkClientFactory.CreateTcpClient(); Uri connectionUri = ConnectionParameters.ConnectionUri; @@ -40,7 +44,8 @@ public override async Task ConnectAsync(CancellationToken token) await _streamDataClient.ConnectAsync(connectionUri.Host, httpPort); - _dataNetworkStream = _streamDataClient.GetStream(); + _remoteEndPoint = _streamDataClient.RemoteEndPoint; + _dataNetworkStream = new NetworkStream(_streamDataClient, false); string request = ComposeGetRequest(); byte[] requestBytes = Encoding.ASCII.GetBytes(request); @@ -100,24 +105,16 @@ public override Stream GetStream() public override void Dispose() { - if (_streamDataClient != null) - { - _streamDataClient.Client.Close(); - _streamDataClient.Dispose(); - _streamDataClient = null; - } + if (Interlocked.CompareExchange(ref _disposed, 1, 0) != 0) + return; - if (_commandsClient != null) - { - _commandsClient.Client.Close(); - _commandsClient.Dispose(); - _commandsClient = null; - } + _streamDataClient?.Close(); + _commandsClient?.Close(); } protected override async Task WriteAsync(byte[] buffer, int offset, int count) { - using (_commandsClient = TcpClientFactory.Create()) + using (_commandsClient = NetworkClientFactory.CreateTcpClient()) { Uri connectionUri = ConnectionParameters.ConnectionUri; @@ -125,16 +122,19 @@ protected override async Task WriteAsync(byte[] buffer, int offset, int count) await _commandsClient.ConnectAsync(connectionUri.Host, httpPort); - NetworkStream commandsStream = _commandsClient.GetStream(); - string base64CodedCommandString = Convert.ToBase64String(buffer, offset, count); byte[] base64CommandBytes = Encoding.ASCII.GetBytes(base64CodedCommandString); string request = ComposePostRequest(base64CommandBytes); byte[] requestBytes = Encoding.ASCII.GetBytes(request); - await commandsStream.WriteAsync(requestBytes, 0, requestBytes.Length); - await commandsStream.WriteAsync(base64CommandBytes, 0, base64CommandBytes.Length); + ArraySegment[] sendList = + { + new ArraySegment(requestBytes), + new ArraySegment(base64CommandBytes) + }; + + await _commandsClient.SendAsync(sendList, SocketFlags.None); } } @@ -200,17 +200,24 @@ private async Task ReadUntilEndOfHeadersAsync(Stream stream, byte[] buffer) int count = buffer.Length - totalRead; if (count == 0) - throw new RtspBadResponseException($"Response is too large (> {buffer.Length / 1024} KB)"); + throw new HttpBadResponseException($"Response is too large (> {buffer.Length / 1024} KB)"); int read = await stream.ReadAsync(buffer, offset, count); if (read == 0) throw new EndOfStreamException("End of http stream"); - offset += read; totalRead += read; - endOfHeaders = ArrayUtils.IndexOfBytes(buffer, Constants.DoubleCrlfBytes, 0, totalRead); + int startIndex = offset - (Constants.DoubleCrlfBytes.Length - 1); + + if (startIndex < 0) + startIndex = 0; + + endOfHeaders = ArrayUtils.LastIndexOfBytes(buffer, Constants.DoubleCrlfBytes, startIndex, + totalRead - startIndex); + + offset += read; } while (endOfHeaders == -1); return totalRead; diff --git a/RtspClientSharp/Rtsp/RtspRequestMessageFactory.cs b/RtspClientSharp/Rtsp/RtspRequestMessageFactory.cs index e2291c4..9831604 100644 --- a/RtspClientSharp/Rtsp/RtspRequestMessageFactory.cs +++ b/RtspClientSharp/Rtsp/RtspRequestMessageFactory.cs @@ -46,6 +46,18 @@ public RtspRequestMessage CreateSetupTcpInterleavedRequest(string trackName, int return rtspRequestMessage; } + public RtspRequestMessage CreateSetupUdpUnicastRequest(string trackName, int rtpPort, int rtcpPort) + { + Uri trackUri = !Uri.IsWellFormedUriString(trackName, UriKind.Absolute) + ? new Uri(GetContentBasedUri(), trackName) + : new Uri(trackName, UriKind.Absolute); + + var rtspRequestMessage = new RtspRequestMessage(RtspMethod.SETUP, trackUri, ProtocolVersion, ++_cSeq, + _userAgent, SessionId); + rtspRequestMessage.Headers.Add("Transport", $"RTP/AVP;unicast;client_port={rtpPort}-{rtcpPort}"); + return rtspRequestMessage; + } + public RtspRequestMessage CreatePlayRequest() { Uri uri = GetContentBasedUri(); @@ -70,13 +82,6 @@ public RtspRequestMessage CreateGetParameterRequest() return rtspRequestMessage; } - public void ResetState() - { - _cSeq = 0; - ContentBase = null; - SessionId = null; - } - private Uri GetContentBasedUri() { if (ContentBase != null) diff --git a/RtspClientSharp/Rtsp/RtspResponseMessage.cs b/RtspClientSharp/Rtsp/RtspResponseMessage.cs index b9e8435..4f082b9 100644 --- a/RtspClientSharp/Rtsp/RtspResponseMessage.cs +++ b/RtspClientSharp/Rtsp/RtspResponseMessage.cs @@ -25,7 +25,7 @@ public static RtspResponseMessage Parse(ArraySegment byteSegment) { Debug.Assert(byteSegment.Array != null, "byteSegment.Array != null"); - var headersStream = new MemoryStream(byteSegment.Array, byteSegment.Offset, byteSegment.Count); + var headersStream = new MemoryStream(byteSegment.Array, byteSegment.Offset, byteSegment.Count, false); var headersReader = new StreamReader(headersStream); string startLine = headersReader.ReadLine(); @@ -64,8 +64,9 @@ public override string ToString() { sb.AppendLine(); - string bodyString = - Encoding.ASCII.GetString(ResponseBody.Array, ResponseBody.Offset, ResponseBody.Count); + string bodyString = Encoding.ASCII.GetString(ResponseBody.Array, + ResponseBody.Offset, ResponseBody.Count); + sb.Append(bodyString); } diff --git a/RtspClientSharp/Rtsp/RtspTcpTransportClient.cs b/RtspClientSharp/Rtsp/RtspTcpTransportClient.cs index 952163d..d225e1a 100644 --- a/RtspClientSharp/Rtsp/RtspTcpTransportClient.cs +++ b/RtspClientSharp/Rtsp/RtspTcpTransportClient.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.IO; +using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; @@ -10,8 +11,12 @@ namespace RtspClientSharp.Rtsp { class RtspTcpTransportClient : RtspTransportClient { - private TcpClient _tcpClient; + private Socket _tcpClient; private Stream _networkStream; + private EndPoint _remoteEndPoint = new IPEndPoint(IPAddress.None, 0); + private int _disposed; + + public override EndPoint RemoteEndPoint => _remoteEndPoint; public RtspTcpTransportClient(ConnectionParameters connectionParameters) : base(connectionParameters) @@ -20,7 +25,7 @@ public RtspTcpTransportClient(ConnectionParameters connectionParameters) public override async Task ConnectAsync(CancellationToken token) { - _tcpClient = TcpClientFactory.Create(); + _tcpClient = NetworkClientFactory.CreateTcpClient(); Uri connectionUri = ConnectionParameters.ConnectionUri; @@ -28,7 +33,8 @@ public override async Task ConnectAsync(CancellationToken token) await _tcpClient.ConnectAsync(connectionUri.Host, rtspPort); - _networkStream = _tcpClient.GetStream(); + _remoteEndPoint = _tcpClient.RemoteEndPoint; + _networkStream = new NetworkStream(_tcpClient, false); } public override Stream GetStream() @@ -41,12 +47,10 @@ public override Stream GetStream() public override void Dispose() { - if (_tcpClient == null) + if (Interlocked.CompareExchange(ref _disposed, 1, 0) != 0) return; - _tcpClient.Client.Close(); - _tcpClient.Dispose(); - _tcpClient = null; + _tcpClient?.Close(); } protected override Task WriteAsync(byte[] buffer, int offset, int count) diff --git a/RtspClientSharp/Rtsp/RtspTransportClient.cs b/RtspClientSharp/Rtsp/RtspTransportClient.cs index 5c52987..b74a303 100644 --- a/RtspClientSharp/Rtsp/RtspTransportClient.cs +++ b/RtspClientSharp/Rtsp/RtspTransportClient.cs @@ -1,5 +1,6 @@ using System; using System.IO; +using System.Net; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -11,10 +12,12 @@ namespace RtspClientSharp.Rtsp abstract class RtspTransportClient : IRtspTransportClient { protected readonly ConnectionParameters ConnectionParameters; - private readonly byte[] _readBuffer = new byte[Constants.MaxResponseHeadersSize]; + private readonly byte[] _buffer = new byte[Constants.MaxResponseHeadersSize]; private Authenticator _authenticator; + public abstract EndPoint RemoteEndPoint { get; } + protected RtspTransportClient(ConnectionParameters connectionParameters) { ConnectionParameters = @@ -41,7 +44,7 @@ public async Task ExecuteRequest(RtspRequestMessage request { token.ThrowIfCancellationRequested(); - await SendRequestAsync(requestMessage); + await SendRequestAsync(requestMessage, token); RtspResponseMessage responseMessage = await GetResponseAsync(responseReadPortionSize); @@ -51,7 +54,7 @@ public async Task ExecuteRequest(RtspRequestMessage request if (ConnectionParameters.Credentials.IsEmpty() || _authenticator != null) throw new RtspBadResponseCodeException(responseMessage.StatusCode); - string authenticateHeader = responseMessage.Headers.Get(WellKnownHeaders.WwwAuthenticate); + string authenticateHeader = responseMessage.Headers[WellKnownHeaders.WwwAuthenticate]; if (string.IsNullOrEmpty(authenticateHeader)) throw new RtspBadResponseCodeException(responseMessage.StatusCode); @@ -59,7 +62,7 @@ public async Task ExecuteRequest(RtspRequestMessage request _authenticator = Authenticator.Create(ConnectionParameters.Credentials, authenticateHeader); requestMessage.CSeq++; - await SendRequestAsync(requestMessage); + await SendRequestAsync(requestMessage, token); responseMessage = await GetResponseAsync(); if (responseMessage.StatusCode == RtspStatusCode.Unauthorized) @@ -68,14 +71,17 @@ public async Task ExecuteRequest(RtspRequestMessage request return responseMessage; } - public Task SendRequestAsync(RtspRequestMessage requestMessage) + public Task SendRequestAsync(RtspRequestMessage requestMessage, CancellationToken token) { + token.ThrowIfCancellationRequested(); + if (_authenticator != null) AddAuthorizationHeader(requestMessage); - byte[] requestBytes = Encoding.ASCII.GetBytes(requestMessage.ToString()); - - return WriteAsync(requestBytes, 0, requestBytes.Length); + string requestMessageString = requestMessage.ToString(); + + int written = Encoding.ASCII.GetBytes(requestMessageString, 0, requestMessageString.Length, _buffer, 0); + return WriteAsync(_buffer, 0, written); } public abstract void Dispose(); @@ -88,23 +94,22 @@ private async Task GetResponseAsync(int responseReadPortion { int totalRead = await ReadUntilEndOfHeadersAsync(responseReadPortionSize); - int startOfResponse = ArrayUtils.IndexOfBytes(_readBuffer, Constants.RtspProtocolNameBytes, 0, totalRead); + int startOfResponse = ArrayUtils.IndexOfBytes(_buffer, Constants.RtspProtocolNameBytes, 0, totalRead); if (startOfResponse == -1) throw new RtspBadResponseException("\"RTSP\" start signature is not found"); - int endOfResponseHeaders = ArrayUtils.IndexOfBytes(_readBuffer, Constants.DoubleCrlfBytes, - startOfResponse, totalRead - startOfResponse) + + int endOfResponseHeaders = ArrayUtils.LastIndexOfBytes(_buffer, Constants.DoubleCrlfBytes, 0, totalRead) + Constants.DoubleCrlfBytes.Length; if (endOfResponseHeaders == -1) throw new RtspBadResponseException("End of response headers is not found"); var headersByteSegment = - new ArraySegment(_readBuffer, startOfResponse, endOfResponseHeaders - startOfResponse); + new ArraySegment(_buffer, startOfResponse, endOfResponseHeaders - startOfResponse); RtspResponseMessage rtspResponseMessage = RtspResponseMessage.Parse(headersByteSegment); - string contentLengthString = rtspResponseMessage.Headers.Get(WellKnownHeaders.ContentLength); + string contentLengthString = rtspResponseMessage.Headers[WellKnownHeaders.ContentLength]; if (string.IsNullOrEmpty(contentLengthString)) return rtspResponseMessage; @@ -120,10 +125,10 @@ private async Task GetResponseAsync(int responseReadPortion int dataPartSize = totalRead - headersByteSegment.Count; - Buffer.BlockCopy(_readBuffer, endOfResponseHeaders, _readBuffer, 0, dataPartSize); - await ReadExactAsync(_readBuffer, dataPartSize, (int) (contentLength - dataPartSize)); + Buffer.BlockCopy(_buffer, endOfResponseHeaders, _buffer, 0, dataPartSize); + await ReadExactAsync(_buffer, dataPartSize, (int) (contentLength - dataPartSize)); - rtspResponseMessage.ResponseBody = new ArraySegment(_readBuffer, 0, (int) contentLength); + rtspResponseMessage.ResponseBody = new ArraySegment(_buffer, 0, (int) contentLength); return rtspResponseMessage; } @@ -136,7 +141,7 @@ private async Task ReadUntilEndOfHeadersAsync(int readPortionSize = 0) do { - int count = _readBuffer.Length - totalRead; + int count = _buffer.Length - totalRead; if (readPortionSize != 0 && count > readPortionSize) count = readPortionSize; @@ -145,15 +150,22 @@ private async Task ReadUntilEndOfHeadersAsync(int readPortionSize = 0) throw new RtspBadResponseException( $"Response is too large (> {Constants.MaxResponseHeadersSize / 1024} KB)"); - int read = await ReadAsync(_readBuffer, offset, count); + int read = await ReadAsync(_buffer, offset, count); if (read == 0) throw new EndOfStreamException("End of rtsp stream"); - offset += read; totalRead += read; - endOfHeaders = ArrayUtils.IndexOfBytes(_readBuffer, Constants.DoubleCrlfBytes, 0, totalRead); + int startIndex = offset - (Constants.DoubleCrlfBytes.Length - 1); + + if (startIndex < 0) + startIndex = 0; + + endOfHeaders = ArrayUtils.LastIndexOfBytes(_buffer, Constants.DoubleCrlfBytes, startIndex, + totalRead - startIndex); + + offset += read; } while (endOfHeaders == -1); return totalRead; diff --git a/RtspClientSharp/Rtsp/WellKnownHeaders.cs b/RtspClientSharp/Rtsp/WellKnownHeaders.cs index 487130a..4b53943 100644 --- a/RtspClientSharp/Rtsp/WellKnownHeaders.cs +++ b/RtspClientSharp/Rtsp/WellKnownHeaders.cs @@ -7,5 +7,6 @@ static class WellKnownHeaders public static readonly string ContentBase = "CONTENT-BASE"; public static readonly string Public = "PUBLIC"; public static readonly string Session = "SESSION"; + public static readonly string Transport = "TRANSPORT"; } } \ No newline at end of file diff --git a/RtspClientSharp/RtspClient.cs b/RtspClientSharp/RtspClient.cs index 05030de..fba5737 100644 --- a/RtspClientSharp/RtspClient.cs +++ b/RtspClientSharp/RtspClient.cs @@ -12,10 +12,9 @@ namespace RtspClientSharp public sealed class RtspClient : IRtspClient { private readonly Func _transportClientProvider; - private IRtspTransportClient _rtspTransportClient; - private bool _anyTpktPacketReceived; + private bool _anyFrameReceived; + private RtspClientInternal _rtspClientInternal; private int _disposed; - private readonly RtspClientInternal _rtspClientInternal; public ConnectionParameters ConnectionParameters { get; } @@ -25,9 +24,6 @@ public RtspClient(ConnectionParameters connectionParameters) { ConnectionParameters = connectionParameters ?? throw new ArgumentNullException(nameof(connectionParameters)); - - _transportClientProvider = CreateTransportClient; - _rtspClientInternal = CreateRtspClientInternal(connectionParameters); } internal RtspClient(ConnectionParameters connectionParameters, @@ -35,11 +31,8 @@ internal RtspClient(ConnectionParameters connectionParameters, { ConnectionParameters = connectionParameters ?? throw new ArgumentNullException(nameof(connectionParameters)); - _transportClientProvider = transportClientProvider ?? throw new ArgumentNullException(nameof(transportClientProvider)); - - _rtspClientInternal = CreateRtspClientInternal(connectionParameters); } ~RtspClient() @@ -57,11 +50,11 @@ public async Task ConnectAsync(CancellationToken token) { await Task.Run(async () => { - _rtspTransportClient = _transportClientProvider(); + _rtspClientInternal = CreateRtspClientInternal(ConnectionParameters, _transportClientProvider); try { - Task connectionTask = ConnectInternalAsync(token); + Task connectionTask = _rtspClientInternal.ConnectAsync(token); if (connectionTask.IsCompleted) { @@ -81,7 +74,7 @@ await Task.Run(async () => { connectionTask.IgnoreExceptions(); - if (delayTask.Status == TaskStatus.Canceled) + if (delayTask.IsCanceled) throw new OperationCanceledException(); throw new TimeoutException(); @@ -93,8 +86,8 @@ await Task.Run(async () => } catch (Exception e) { - _rtspTransportClient.Dispose(); - _rtspTransportClient = null; + _rtspClientInternal.Dispose(); + Volatile.Write(ref _rtspClientInternal, null); if (e is TimeoutException) throw new RtspClientException("Connection timeout", e); @@ -125,12 +118,12 @@ e is HttpBadResponseCodeException httpBadResponseCodeException && /// public async Task ReceiveAsync(CancellationToken token) { - if (_rtspTransportClient == null) + if (_rtspClientInternal == null) throw new InvalidOperationException("Client should be connected first"); try { - Task receiveInternalTask = _rtspClientInternal.ReceiveAsync(_rtspTransportClient, token); + Task receiveInternalTask = _rtspClientInternal.ReceiveAsync(token); if (receiveInternalTask.IsCompleted) { @@ -146,10 +139,10 @@ public async Task ReceiveAsync(CancellationToken token) while (true) { - _anyTpktPacketReceived = false; + _anyFrameReceived = false; Task result = await Task.WhenAny(receiveInternalTask, - Task.Delay(ConnectionParameters.ReceiveTimeout, delayTaskToken)); + Task.Delay(ConnectionParameters.ReceiveTimeout, delayTaskToken)).ConfigureAwait(false); if (result == receiveInternalTask) { @@ -160,14 +153,15 @@ public async Task ReceiveAsync(CancellationToken token) if (result.IsCanceled) { - await Task.WhenAny(receiveInternalTask, - Task.Delay(TimeSpan.FromSeconds(1), CancellationToken.None)); + if(ConnectionParameters.CancelTimeout != TimeSpan.Zero) + await Task.WhenAny(receiveInternalTask, + Task.Delay(ConnectionParameters.CancelTimeout, CancellationToken.None)); receiveInternalTask.IgnoreExceptions(); throw new OperationCanceledException(); } - if (!Volatile.Read(ref _anyTpktPacketReceived)) + if (!Volatile.Read(ref _anyFrameReceived)) { receiveInternalTask.IgnoreExceptions(); throw new RtspClientException("Receive timeout", new TimeoutException()); @@ -175,6 +169,10 @@ await Task.WhenAny(receiveInternalTask, } } } + catch (InvalidOperationException) + { + throw; + } catch (OperationCanceledException) { throw; @@ -189,8 +187,8 @@ await Task.WhenAny(receiveInternalTask, } finally { - _rtspTransportClient.Dispose(); - _rtspTransportClient = null; + _rtspClientInternal.Dispose(); + Volatile.Write(ref _rtspClientInternal, null); } } @@ -199,39 +197,27 @@ await Task.WhenAny(receiveInternalTask, /// public void Dispose() { - if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0) + if (Interlocked.CompareExchange(ref _disposed, 1, 0) != 0) return; - IRtspTransportClient client = _rtspTransportClient; + RtspClientInternal rtspClientInternal = Volatile.Read(ref _rtspClientInternal); - if (client != null) - client.Dispose(); + if (rtspClientInternal != null) + rtspClientInternal.Dispose(); GC.SuppressFinalize(this); } - private IRtspTransportClient CreateTransportClient() - { - if (ConnectionParameters.ConnectionUri.Scheme.Equals(Uri.UriSchemeHttp, - StringComparison.InvariantCultureIgnoreCase)) - return new RtspHttpTransportClient(ConnectionParameters); - - return new RtspTcpTransportClient(ConnectionParameters); - } - - private async Task ConnectInternalAsync(CancellationToken token) - { - await _rtspTransportClient.ConnectAsync(token); - - await _rtspClientInternal.ConnectAsync(_rtspTransportClient, token); - } - - private RtspClientInternal CreateRtspClientInternal(ConnectionParameters connectionParameters) + private RtspClientInternal CreateRtspClientInternal(ConnectionParameters connectionParameters, + Func transportClientProvider) { - return new RtspClientInternal(connectionParameters) + return new RtspClientInternal(connectionParameters, transportClientProvider) { - ReadingContinues = () => Volatile.Write(ref _anyTpktPacketReceived, true), - FrameReceived = frame => FrameReceived?.Invoke(this, frame) + FrameReceived = frame => + { + Volatile.Write(ref _anyFrameReceived, true); + FrameReceived?.Invoke(this, frame); + } }; } } diff --git a/RtspClientSharp/RtspClientSharp.csproj b/RtspClientSharp/RtspClientSharp.csproj index 084b0b9..a446fe9 100644 --- a/RtspClientSharp/RtspClientSharp.csproj +++ b/RtspClientSharp/RtspClientSharp.csproj @@ -8,7 +8,7 @@ rtsp, rtp, rtcp RTSP Client for .NET -- Supported transport protocols: TCP and HTTP (UDP will be added) +- Supported transport protocols: TCP/HTTP/UDP - Supported media codecs: H.264/MJPEG/AAC/G711A/G711U/PCM/G726 - No external dependencies, pure C# code - Asynchronous nature with cancellation tokens support @@ -16,8 +16,11 @@ - Low GC pressure during receive stage (when RTSP session is established); low amount of freezes under high load https://opensource.org/licenses/MIT false - http://github.com/BogdanovKirill/RtspClientSharp/Images/package_icon.png + https://github.com/BogdanovKirill/RtspClientSharp/blob/master/Images/package_icon.png Copyright ©2018 Kirill Bogdanov + 1.1.0 + -RTP/RTCP over UDP transport is added +-several bug fixes diff --git a/RtspClientSharp/Tpkt/TpktStream.cs b/RtspClientSharp/Tpkt/TpktStream.cs index d92e93f..3dc765f 100644 --- a/RtspClientSharp/Tpkt/TpktStream.cs +++ b/RtspClientSharp/Tpkt/TpktStream.cs @@ -86,7 +86,8 @@ public async Task WriteAsync(int channel, ArraySegment payloadSegment) Buffer.BlockCopy(payloadSegment.Array, payloadSegment.Offset, _writeBuffer, TpktHeader.Size, payloadSegment.Count); - await _stream.WriteAsync(_writeBuffer, 0, _writeBuffer.Length); + int write = 4 + payloadSegment.Count; + await _stream.WriteAsync(_writeBuffer, 0, write); } private async Task FindNextPacketAsync() diff --git a/RtspClientSharp/Utils/ArrayUtils.cs b/RtspClientSharp/Utils/ArrayUtils.cs index 9b307e9..2d6df74 100644 --- a/RtspClientSharp/Utils/ArrayUtils.cs +++ b/RtspClientSharp/Utils/ArrayUtils.cs @@ -49,7 +49,7 @@ public static int IndexOfBytes(byte[] array, byte[] pattern, int startIndex, int if (count < pattern.Length) return -1; - int endIndex = count > 0 ? startIndex + count : array.Length; + int endIndex = startIndex + count; int foundIndex = 0; for (; startIndex < endIndex; startIndex++) @@ -62,5 +62,25 @@ public static int IndexOfBytes(byte[] array, byte[] pattern, int startIndex, int return -1; } + + public static int LastIndexOfBytes(byte[] array, byte[] pattern, int startIndex, int count) + { + if (count < pattern.Length) + return -1; + + int endIndex = startIndex + count - 1; + + int lastPatternIndex = pattern.Length - 1; + int foundIndex = lastPatternIndex; + for (; endIndex >= startIndex; endIndex--) + { + if (array[endIndex] != pattern[foundIndex]) + foundIndex = lastPatternIndex; + else if (--foundIndex == -1) + return endIndex; + } + + return -1; + } } } \ No newline at end of file diff --git a/RtspClientSharp/Utils/ChunksArray.cs b/RtspClientSharp/Utils/ChunksArray.cs new file mode 100644 index 0000000..c4d3c7d --- /dev/null +++ b/RtspClientSharp/Utils/ChunksArray.cs @@ -0,0 +1,77 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; + +namespace RtspClientSharp.Utils +{ + /// + /// The main idea of that class is to reduce the amount of references to arrays for GC + /// + class ChunksArray + { + private readonly int _maxChunkSize; + private readonly int _maxNumberOfChunks; + private byte[] _chunksBytes = Array.Empty(); + private readonly List _sizesList; + private int _chunksCount; + + public int Count => _chunksCount; + + public ChunksArray(int maxChunkSize, int maxNumberOfChunks) + { + _maxChunkSize = maxChunkSize; + _maxNumberOfChunks = maxNumberOfChunks; + _sizesList = new List(maxNumberOfChunks); + } + + public ArraySegment this[int index] + { + get + { + int offset = index * _maxChunkSize; + return new ArraySegment(_chunksBytes, offset, _sizesList[index]); + } + } + + public void Add(ArraySegment chunkSegment) + { + Debug.Assert(chunkSegment.Array != null, "chunkSegment.Array != null"); + + if (chunkSegment.Count > _maxChunkSize) + throw new ArgumentException($"Chunk size is too large: {chunkSegment.Count}", nameof(chunkSegment)); + if (_chunksCount == _maxNumberOfChunks) + throw new InvalidOperationException("Number of chunks is reached the upper limit"); + + int requiredSize = ++_chunksCount * _maxChunkSize; + + if (_chunksBytes.Length < requiredSize) + Array.Resize(ref _chunksBytes, requiredSize); + + int offset = requiredSize - _maxChunkSize; + Buffer.BlockCopy(chunkSegment.Array, chunkSegment.Offset, _chunksBytes, offset, chunkSegment.Count); + + _sizesList.Add(chunkSegment.Count); + } + + public void RemoveAt(int index) + { + _sizesList.RemoveAt(index); + --_chunksCount; + + if (index < _chunksCount) + { + int dstOffset = index * _maxChunkSize; + int srcOffset = dstOffset + _maxChunkSize; + int copyCount = _chunksCount * _maxChunkSize - dstOffset; + + Buffer.BlockCopy(_chunksBytes, srcOffset, _chunksBytes, dstOffset, copyCount); + } + } + + public void Clear() + { + _sizesList.Clear(); + _chunksCount = 0; + } + } +} \ No newline at end of file diff --git a/RtspClientSharp/Utils/Delegates.cs b/RtspClientSharp/Utils/Delegates.cs new file mode 100644 index 0000000..eef8c53 --- /dev/null +++ b/RtspClientSharp/Utils/Delegates.cs @@ -0,0 +1,4 @@ +namespace RtspClientSharp.Utils +{ + public delegate void RefAction(ref T value); +} \ No newline at end of file diff --git a/RtspClientSharp/Utils/NetworkClientFactory.cs b/RtspClientSharp/Utils/NetworkClientFactory.cs new file mode 100644 index 0000000..9344a07 --- /dev/null +++ b/RtspClientSharp/Utils/NetworkClientFactory.cs @@ -0,0 +1,30 @@ +using System.Net.Sockets; + +namespace RtspClientSharp.Utils +{ + static class NetworkClientFactory + { + private const int TcpReceiveBufferDefaultSize = 64 * 1024; + + public static Socket CreateTcpClient() + { + return new Socket(AddressFamily.InterNetworkV6, SocketType.Stream, ProtocolType.Tcp) + { + SendBufferSize = 0, + ReceiveBufferSize = TcpReceiveBufferDefaultSize, + DualMode = true, + NoDelay = true + }; + } + + public static Socket CreateUdpClient() + { + var socket = new Socket(AddressFamily.InterNetworkV6, SocketType.Dgram, ProtocolType.Udp) + { + SendBufferSize = 0, + DualMode = true + }; + return socket; + } + } +} \ No newline at end of file diff --git a/RtspClientSharp/Utils/SimpleHybridLock.cs b/RtspClientSharp/Utils/SimpleHybridLock.cs new file mode 100644 index 0000000..c23000e --- /dev/null +++ b/RtspClientSharp/Utils/SimpleHybridLock.cs @@ -0,0 +1,41 @@ +using System; +using System.Threading; + +namespace RtspClientSharp.Utils +{ + /// + /// From CLR via C#, fourth edition + /// + internal sealed class SimpleHybridLock : IDisposable + { + // The Int32 is used by the primitive user-mode constructs (Interlocked methods) + private int _waiters; + + // The AutoResetEvent is the primitive kernel-mode construct + private readonly AutoResetEvent _waiterLock = new AutoResetEvent(false); + + public void Enter() + { + // Indicate that this thread wants the lock + if (Interlocked.Increment(ref _waiters) == 1) + return; // Lock was free, no contention, just return + // Another thread is waiting. There is contention, block this thread + _waiterLock.WaitOne(); // Bad performance hit here + // When WaitOne returns, this thread now has the lock + } + + public void Leave() + { + // This thread is releasing the lock + if (Interlocked.Decrement(ref _waiters) == 0) + return; // No other threads are blocked, just return + // Other threads are blocked, wake 1 of them + _waiterLock.Set(); // Bad performance hit here + } + + public void Dispose() + { + _waiterLock.Dispose(); + } + } +} \ No newline at end of file diff --git a/RtspClientSharp/Utils/TcpClientFactory.cs b/RtspClientSharp/Utils/TcpClientFactory.cs deleted file mode 100644 index 2461fd8..0000000 --- a/RtspClientSharp/Utils/TcpClientFactory.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System.Net.Sockets; - -namespace RtspClientSharp.Utils -{ - static class TcpClientFactory - { - public static TcpClient Create() - { - return new TcpClient(AddressFamily.InterNetworkV6) - { - SendBufferSize = 0, - Client = {DualMode = true}, - NoDelay = true - }; - } - } -} \ No newline at end of file