diff --git a/media/server/main/include/MediaPipelineServerInternal.h b/media/server/main/include/MediaPipelineServerInternal.h index 0c92f63c0..7b84fee7f 100644 --- a/media/server/main/include/MediaPipelineServerInternal.h +++ b/media/server/main/include/MediaPipelineServerInternal.h @@ -723,6 +723,12 @@ class MediaPipelineServerInternal : public IMediaPipelineServerInternal, public * @retval NeedMediaData timeout */ std::chrono::milliseconds getNeedMediaDataTimeout(MediaSourceType mediaSourceType) const; + +private: + std::map m_needMediaDataBackoff; + + void updateNeedMediaDataDelay(MediaSourceType mediaSourceType, std::chrono::milliseconds delay); + void resetNeedMediaDataBackoff(MediaSourceType mediaSourceType); }; }; // namespace firebolt::rialto::server diff --git a/media/server/main/source/MediaPipelineServerInternal.cpp b/media/server/main/source/MediaPipelineServerInternal.cpp index 22fa5ca53..d8fe32e24 100644 --- a/media/server/main/source/MediaPipelineServerInternal.cpp +++ b/media/server/main/source/MediaPipelineServerInternal.cpp @@ -300,6 +300,7 @@ bool MediaPipelineServerInternal::removeSourceInternal(int32_t id) } m_needMediaDataTimers.erase(sourceIter->first); + resetNeedMediaDataBackoff(sourceIter->first); m_attachedSources.erase(sourceIter); return true; } @@ -1152,6 +1153,7 @@ bool MediaPipelineServerInternal::flushInternal(int32_t sourceId, bool resetTime m_gstPlayer->flush(sourceIter->first, resetTime, async); m_needMediaDataTimers.erase(sourceIter->first); + resetNeedMediaDataBackoff(sourceIter->first); // Reset Eos on flush auto it = m_isMediaTypeEosMap.find(sourceIter->first); @@ -1604,24 +1606,38 @@ void MediaPipelineServerInternal::scheduleNotifyNeedMediaData(MediaSourceType me return; } + std::chrono::milliseconds delay = getNeedMediaDataTimeout(mediaSourceType); + + auto it = m_needMediaDataBackoff.find(mediaSourceType); + if (it != m_needMediaDataBackoff.end()) + { + delay = it->second; + } + m_needMediaDataTimers[mediaSourceType] = m_timerFactory - ->createTimer(getNeedMediaDataTimeout(mediaSourceType), - [this, mediaSourceType]() + ->createTimer(delay, + [this, mediaSourceType, delay]() { m_mainThread ->enqueueTask(m_mainThreadClientId, - [this, mediaSourceType]() + [this, mediaSourceType, delay]() { m_needMediaDataTimers.erase(mediaSourceType); if (!notifyNeedMediaDataInternal(mediaSourceType)) { + updateNeedMediaDataDelay(mediaSourceType, delay); + RIALTO_SERVER_LOG_WARN("Scheduled Need media data sending " "failed for: %s. Scheduling again...", common::convertMediaSourceType( mediaSourceType)); scheduleNotifyNeedMediaData(mediaSourceType); } + else + { + resetNeedMediaDataBackoff(mediaSourceType); + } }); }); } @@ -1637,4 +1653,18 @@ std::chrono::milliseconds MediaPipelineServerInternal::getNeedMediaDataTimeout(M } return kDefaultNeedMediaDataResendTimeMs; } + +void MediaPipelineServerInternal::updateNeedMediaDataDelay(MediaSourceType mediaSourceType, + std::chrono::milliseconds delay) +{ + constexpr std::chrono::milliseconds kbackoffMaxRetryDelay{500}; + constexpr uint32_t kbackoffMultiplier{2}; + auto nextDelay = std::min(delay * kbackoffMultiplier, kbackoffMaxRetryDelay); + m_needMediaDataBackoff[mediaSourceType] = nextDelay; +} + +void MediaPipelineServerInternal::resetNeedMediaDataBackoff(MediaSourceType mediaSourceType) +{ + m_needMediaDataBackoff.erase(mediaSourceType); +} }; // namespace firebolt::rialto::server diff --git a/tests/unittests/media/server/main/mediaPipeline/FlushTest.cpp b/tests/unittests/media/server/main/mediaPipeline/FlushTest.cpp index c4ef4c947..4936daacf 100644 --- a/tests/unittests/media/server/main/mediaPipeline/FlushTest.cpp +++ b/tests/unittests/media/server/main/mediaPipeline/FlushTest.cpp @@ -103,3 +103,61 @@ TEST_F(RialtoServerMediaPipelineFlushTest, FlushResetEos) expectNotifyNeedData(firebolt::rialto::MediaSourceType::VIDEO, sourceId, 24); m_gstPlayerCallback->notifyNeedMediaData(firebolt::rialto::MediaSourceType::VIDEO); } + +TEST_F(RialtoServerMediaPipelineFlushTest, FlushResetsNeedMediaDataBackoff) +{ + constexpr auto kStatus = firebolt::rialto::MediaSourceStatus::ERROR; + constexpr auto kNeedDataRequestId = 0U; + const std::chrono::milliseconds kBackoffAfterFailure{30}; + bool async{false}; + std::function resendCallback; + std::unique_ptr mediaSource = + std::make_unique(m_kMimeType); + + loadGstPlayer(); + + { + ::testing::InSequence seq; + + EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _)) + .WillOnce(Invoke( + [&](const std::chrono::milliseconds &timeout, const std::function &callback, + firebolt::rialto::common::TimerType timerType) + { + resendCallback = callback; + return std::make_unique<::testing::NiceMock>(); + })); + EXPECT_CALL(*m_timerFactoryMock, createTimer(kBackoffAfterFailure, _, _)) + .WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock>()))); + EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _)) + .WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock>()))); + } + + mainThreadWillEnqueueTaskAndWait(); + ASSERT_TRUE(m_activeRequestsMock); + EXPECT_CALL(*m_activeRequestsMock, getType(kNeedDataRequestId)).WillOnce(Return(m_kType)); + EXPECT_CALL(*m_activeRequestsMock, erase(kNeedDataRequestId)); + EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, 24, kNeedDataRequestId)); + + ASSERT_TRUE(resendCallback); + mainThreadWillEnqueueTask(); + EXPECT_CALL(*m_sharedMemoryBufferMock, + clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, m_kType)) + .WillOnce(Return(true)); + resendCallback(); + + mainThreadWillEnqueueTaskAndWait(); + EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource))); + EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true); + std::int32_t sourceId{mediaSource->getId()}; + + mainThreadWillEnqueueTaskAndWait(); + EXPECT_CALL(*m_gstPlayerMock, flush(m_kType, m_kResetTime, async)); + EXPECT_TRUE(m_mediaPipeline->flush(sourceId, m_kResetTime, async)); + + constexpr auto kNextNeedDataRequestId = 1U; + mainThreadWillEnqueueTaskAndWait(); + EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(m_kType)); + EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId)); + EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, 24, kNextNeedDataRequestId)); +} diff --git a/tests/unittests/media/server/main/mediaPipeline/HaveDataTest.cpp b/tests/unittests/media/server/main/mediaPipeline/HaveDataTest.cpp index b1980c220..e9e6e21b2 100644 --- a/tests/unittests/media/server/main/mediaPipeline/HaveDataTest.cpp +++ b/tests/unittests/media/server/main/mediaPipeline/HaveDataTest.cpp @@ -21,6 +21,7 @@ using ::testing::A; using ::testing::ByMove; +using ::testing::InSequence; using ::testing::Ref; using ::testing::ReturnRef; using ::testing::Throw; @@ -298,6 +299,94 @@ TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataSuccessWithR resendCallback(); } +TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataResendBackoffDoublesOnFailureAndResetsAfterSuccess) +{ + auto status = firebolt::rialto::MediaSourceStatus::ERROR; + auto mediaSourceType = firebolt::rialto::MediaSourceType::VIDEO; + std::function firstResendCallback; + std::function secondResendCallback; + std::function thirdResendCallback; + const std::chrono::milliseconds kFirstBackoff{30}; + const std::chrono::milliseconds kSecondBackoff{60}; + + loadGstPlayer(); + + { + InSequence seq; + + EXPECT_CALL(*m_timerFactoryMock, createTimer(m_kDefaultNeedMediaDataResendTimeout, _, _)) + .WillOnce(Invoke( + [&](const std::chrono::milliseconds &timeout, const std::function &callback, + firebolt::rialto::common::TimerType timerType) + { + firstResendCallback = callback; + return std::make_unique<::testing::NiceMock>(); + })); + EXPECT_CALL(*m_timerFactoryMock, createTimer(kFirstBackoff, _, _)) + .WillOnce(Invoke( + [&](const std::chrono::milliseconds &timeout, const std::function &callback, + firebolt::rialto::common::TimerType timerType) + { + secondResendCallback = callback; + return std::make_unique<::testing::NiceMock>(); + })); + EXPECT_CALL(*m_timerFactoryMock, createTimer(kSecondBackoff, _, _)) + .WillOnce(Invoke( + [&](const std::chrono::milliseconds &timeout, const std::function &callback, + firebolt::rialto::common::TimerType timerType) + { + thirdResendCallback = callback; + return std::make_unique<::testing::NiceMock>(); + })); + EXPECT_CALL(*m_timerFactoryMock, createTimer(m_kDefaultNeedMediaDataResendTimeout, _, _)) + .WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock>()))); + } + + mainThreadWillEnqueueTaskAndWait(); + ASSERT_TRUE(m_activeRequestsMock); + EXPECT_CALL(*m_activeRequestsMock, getType(m_kNeedDataRequestId)).WillOnce(Return(mediaSourceType)); + EXPECT_CALL(*m_activeRequestsMock, erase(m_kNeedDataRequestId)); + EXPECT_TRUE(m_mediaPipeline->haveData(status, m_kNumFrames, m_kNeedDataRequestId)); + + ASSERT_TRUE(firstResendCallback); + mainThreadWillEnqueueTask(); + EXPECT_CALL(*m_sharedMemoryBufferMock, + clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType)) + .WillOnce(Return(true)); + firstResendCallback(); + + ASSERT_TRUE(secondResendCallback); + mainThreadWillEnqueueTask(); + EXPECT_CALL(*m_sharedMemoryBufferMock, + clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType)) + .WillOnce(Return(true)); + secondResendCallback(); + + const int kSourceId = attachSource(mediaSourceType, "video/h264"); + + ASSERT_TRUE(thirdResendCallback); + mainThreadWillEnqueueTask(); + EXPECT_CALL(*m_sharedMemoryBufferMock, + clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType)) + .WillOnce(Return(true)); + EXPECT_CALL(*m_sharedMemoryBufferMock, + getMaxDataLen(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType)) + .WillOnce(Return(7 * 1024 * 1024)); + EXPECT_CALL(*m_sharedMemoryBufferMock, + getDataOffset(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType)) + .WillOnce(Return(0)); + EXPECT_CALL(*m_activeRequestsMock, insert(mediaSourceType, _)).WillOnce(Return(0)); + EXPECT_CALL(*m_mediaPipelineClientMock, + notifyNeedMediaData(kSourceId, m_kNumFrames, 0, _)); // params tested in NeedMediaDataTests + thirdResendCallback(); + + const auto kNextNeedDataRequestId{m_kNeedDataRequestId + 1}; + mainThreadWillEnqueueTaskAndWait(); + EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(mediaSourceType)); + EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId)); + EXPECT_TRUE(m_mediaPipeline->haveData(status, m_kNumFrames, kNextNeedDataRequestId)); +} + TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataFailureDueToShmBufferError) { auto status = firebolt::rialto::MediaSourceStatus::OK; diff --git a/tests/unittests/media/server/main/mediaPipeline/SourceTest.cpp b/tests/unittests/media/server/main/mediaPipeline/SourceTest.cpp index 56eae34ee..00a6bd176 100644 --- a/tests/unittests/media/server/main/mediaPipeline/SourceTest.cpp +++ b/tests/unittests/media/server/main/mediaPipeline/SourceTest.cpp @@ -100,6 +100,66 @@ TEST_F(RialtoServerMediaPipelineSourceTest, RemoveSourceSuccess) EXPECT_EQ(m_mediaPipeline->removeSource(sourceId), true); } +TEST_F(RialtoServerMediaPipelineSourceTest, RemoveSourceResetsNeedMediaDataBackoff) +{ + constexpr auto kStatus = firebolt::rialto::MediaSourceStatus::ERROR; + constexpr auto kNeedDataRequestId = 0U; + const std::chrono::milliseconds kBackoffAfterFailure{30}; + std::function resendCallback; + std::unique_ptr mediaSource = + std::make_unique(m_kMimeType); + + loadGstPlayer(); + + { + ::testing::InSequence seq; + + EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _)) + .WillOnce(Invoke( + [&](const std::chrono::milliseconds &timeout, const std::function &callback, + firebolt::rialto::common::TimerType timerType) + { + resendCallback = callback; + return std::make_unique<::testing::NiceMock>(); + })); + EXPECT_CALL(*m_timerFactoryMock, createTimer(kBackoffAfterFailure, _, _)) + .WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock>()))); + EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _)) + .WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock>()))); + } + + mainThreadWillEnqueueTaskAndWait(); + ASSERT_TRUE(m_activeRequestsMock); + EXPECT_CALL(*m_activeRequestsMock, getType(kNeedDataRequestId)).WillOnce(Return(m_type)); + EXPECT_CALL(*m_activeRequestsMock, erase(kNeedDataRequestId)); + EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, kNeedDataRequestId)); + + ASSERT_TRUE(resendCallback); + mainThreadWillEnqueueTask(); + EXPECT_CALL(*m_sharedMemoryBufferMock, + clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, m_type)) + .WillOnce(Return(true)); + resendCallback(); + + mainThreadWillEnqueueTaskAndWait(); + EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource))); + EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true); + std::int32_t firstSourceId{mediaSource->getId()}; + + mainThreadWillEnqueueTaskAndWait(); + EXPECT_EQ(m_mediaPipeline->removeSource(firstSourceId), true); + + mainThreadWillEnqueueTaskAndWait(); + EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource))); + EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true); + + constexpr auto kNextNeedDataRequestId = 1U; + mainThreadWillEnqueueTaskAndWait(); + EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(m_type)); + EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId)); + EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, kNextNeedDataRequestId)); +} + /** * Test that RemoveSource fails if load has not been called (no gstreamer player). */