Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions media/server/main/include/MediaPipelineServerInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,12 @@ class MediaPipelineServerInternal : public IMediaPipelineServerInternal, public
* @retval NeedMediaData timeout
*/
std::chrono::milliseconds getNeedMediaDataTimeout(MediaSourceType mediaSourceType) const;

private:
std::map<MediaSourceType, std::chrono::milliseconds> m_needMediaDataBackoff;

void updateNeedMediaDataDelay(MediaSourceType mediaSourceType, std::chrono::milliseconds delay);
void resetNeedMediaDataBackoff(MediaSourceType mediaSourceType);
};

}; // namespace firebolt::rialto::server
Expand Down
36 changes: 33 additions & 3 deletions media/server/main/source/MediaPipelineServerInternal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ bool MediaPipelineServerInternal::removeSourceInternal(int32_t id)
}

m_needMediaDataTimers.erase(sourceIter->first);
resetNeedMediaDataBackoff(sourceIter->first);
m_attachedSources.erase(sourceIter);
return true;
}
Expand Down Expand Up @@ -1152,6 +1153,7 @@ bool MediaPipelineServerInternal::flushInternal(int32_t sourceId, bool resetTime
m_gstPlayer->flush(sourceIter->first, resetTime, async);

m_needMediaDataTimers.erase(sourceIter->first);
resetNeedMediaDataBackoff(sourceIter->first);

// Reset Eos on flush
auto it = m_isMediaTypeEosMap.find(sourceIter->first);
Expand Down Expand Up @@ -1604,24 +1606,38 @@ void MediaPipelineServerInternal::scheduleNotifyNeedMediaData(MediaSourceType me
return;
}

std::chrono::milliseconds delay = getNeedMediaDataTimeout(mediaSourceType);

auto it = m_needMediaDataBackoff.find(mediaSourceType);
if (it != m_needMediaDataBackoff.end())
{
delay = it->second;
}

m_needMediaDataTimers[mediaSourceType] =
m_timerFactory
->createTimer(getNeedMediaDataTimeout(mediaSourceType),
[this, mediaSourceType]()
->createTimer(delay,
[this, mediaSourceType, delay]()
{
m_mainThread
->enqueueTask(m_mainThreadClientId,
[this, mediaSourceType]()
[this, mediaSourceType, delay]()
{
m_needMediaDataTimers.erase(mediaSourceType);
if (!notifyNeedMediaDataInternal(mediaSourceType))
{
updateNeedMediaDataDelay(mediaSourceType, delay);

RIALTO_SERVER_LOG_WARN("Scheduled Need media data sending "
"failed for: %s. Scheduling again...",
common::convertMediaSourceType(
mediaSourceType));
scheduleNotifyNeedMediaData(mediaSourceType);
}
else
{
resetNeedMediaDataBackoff(mediaSourceType);
}
});
});
}
Expand All @@ -1637,4 +1653,18 @@ std::chrono::milliseconds MediaPipelineServerInternal::getNeedMediaDataTimeout(M
}
return kDefaultNeedMediaDataResendTimeMs;
}

void MediaPipelineServerInternal::updateNeedMediaDataDelay(MediaSourceType mediaSourceType,
std::chrono::milliseconds delay)
{
constexpr std::chrono::milliseconds kbackoffMaxRetryDelay{500};
constexpr uint32_t kbackoffMultiplier{2};
auto nextDelay = std::min(delay * kbackoffMultiplier, kbackoffMaxRetryDelay);
m_needMediaDataBackoff[mediaSourceType] = nextDelay;
}

void MediaPipelineServerInternal::resetNeedMediaDataBackoff(MediaSourceType mediaSourceType)
{
m_needMediaDataBackoff.erase(mediaSourceType);
}
}; // namespace firebolt::rialto::server
58 changes: 58 additions & 0 deletions tests/unittests/media/server/main/mediaPipeline/FlushTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,61 @@ TEST_F(RialtoServerMediaPipelineFlushTest, FlushResetEos)
expectNotifyNeedData(firebolt::rialto::MediaSourceType::VIDEO, sourceId, 24);
m_gstPlayerCallback->notifyNeedMediaData(firebolt::rialto::MediaSourceType::VIDEO);
}

TEST_F(RialtoServerMediaPipelineFlushTest, FlushResetsNeedMediaDataBackoff)
{
constexpr auto kStatus = firebolt::rialto::MediaSourceStatus::ERROR;
constexpr auto kNeedDataRequestId = 0U;
const std::chrono::milliseconds kBackoffAfterFailure{30};
bool async{false};
std::function<void()> resendCallback;
std::unique_ptr<IMediaPipeline::MediaSource> mediaSource =
std::make_unique<IMediaPipeline::MediaSourceVideo>(m_kMimeType);

loadGstPlayer();

{
::testing::InSequence seq;

EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
resendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(kBackoffAfterFailure, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
}

mainThreadWillEnqueueTaskAndWait();
ASSERT_TRUE(m_activeRequestsMock);
EXPECT_CALL(*m_activeRequestsMock, getType(kNeedDataRequestId)).WillOnce(Return(m_kType));
EXPECT_CALL(*m_activeRequestsMock, erase(kNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, 24, kNeedDataRequestId));

ASSERT_TRUE(resendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, m_kType))
.WillOnce(Return(true));
resendCallback();

mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource)));
EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true);
std::int32_t sourceId{mediaSource->getId()};

mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_gstPlayerMock, flush(m_kType, m_kResetTime, async));
EXPECT_TRUE(m_mediaPipeline->flush(sourceId, m_kResetTime, async));

constexpr auto kNextNeedDataRequestId = 1U;
mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(m_kType));
EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, 24, kNextNeedDataRequestId));
}
89 changes: 89 additions & 0 deletions tests/unittests/media/server/main/mediaPipeline/HaveDataTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

using ::testing::A;
using ::testing::ByMove;
using ::testing::InSequence;
using ::testing::Ref;
using ::testing::ReturnRef;
using ::testing::Throw;
Expand Down Expand Up @@ -298,6 +299,94 @@ TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataSuccessWithR
resendCallback();
}

TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataResendBackoffDoublesOnFailureAndResetsAfterSuccess)
{
auto status = firebolt::rialto::MediaSourceStatus::ERROR;
auto mediaSourceType = firebolt::rialto::MediaSourceType::VIDEO;
std::function<void()> firstResendCallback;
std::function<void()> secondResendCallback;
std::function<void()> thirdResendCallback;
const std::chrono::milliseconds kFirstBackoff{30};
const std::chrono::milliseconds kSecondBackoff{60};

loadGstPlayer();

{
InSequence seq;

EXPECT_CALL(*m_timerFactoryMock, createTimer(m_kDefaultNeedMediaDataResendTimeout, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
firstResendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(kFirstBackoff, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
secondResendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(kSecondBackoff, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
thirdResendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(m_kDefaultNeedMediaDataResendTimeout, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
}

mainThreadWillEnqueueTaskAndWait();
ASSERT_TRUE(m_activeRequestsMock);
EXPECT_CALL(*m_activeRequestsMock, getType(m_kNeedDataRequestId)).WillOnce(Return(mediaSourceType));
EXPECT_CALL(*m_activeRequestsMock, erase(m_kNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(status, m_kNumFrames, m_kNeedDataRequestId));

ASSERT_TRUE(firstResendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(true));
firstResendCallback();

ASSERT_TRUE(secondResendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(true));
secondResendCallback();

const int kSourceId = attachSource(mediaSourceType, "video/h264");

ASSERT_TRUE(thirdResendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(true));
EXPECT_CALL(*m_sharedMemoryBufferMock,
getMaxDataLen(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(7 * 1024 * 1024));
EXPECT_CALL(*m_sharedMemoryBufferMock,
getDataOffset(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(0));
EXPECT_CALL(*m_activeRequestsMock, insert(mediaSourceType, _)).WillOnce(Return(0));
EXPECT_CALL(*m_mediaPipelineClientMock,
notifyNeedMediaData(kSourceId, m_kNumFrames, 0, _)); // params tested in NeedMediaDataTests
thirdResendCallback();

const auto kNextNeedDataRequestId{m_kNeedDataRequestId + 1};
mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(mediaSourceType));
EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(status, m_kNumFrames, kNextNeedDataRequestId));
}

TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataFailureDueToShmBufferError)
{
auto status = firebolt::rialto::MediaSourceStatus::OK;
Expand Down
60 changes: 60 additions & 0 deletions tests/unittests/media/server/main/mediaPipeline/SourceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,66 @@ TEST_F(RialtoServerMediaPipelineSourceTest, RemoveSourceSuccess)
EXPECT_EQ(m_mediaPipeline->removeSource(sourceId), true);
}

TEST_F(RialtoServerMediaPipelineSourceTest, RemoveSourceResetsNeedMediaDataBackoff)
{
constexpr auto kStatus = firebolt::rialto::MediaSourceStatus::ERROR;
constexpr auto kNeedDataRequestId = 0U;
const std::chrono::milliseconds kBackoffAfterFailure{30};
std::function<void()> resendCallback;
std::unique_ptr<IMediaPipeline::MediaSource> mediaSource =
std::make_unique<IMediaPipeline::MediaSourceAudio>(m_kMimeType);

loadGstPlayer();

{
::testing::InSequence seq;

EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
resendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(kBackoffAfterFailure, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
}

mainThreadWillEnqueueTaskAndWait();
ASSERT_TRUE(m_activeRequestsMock);
EXPECT_CALL(*m_activeRequestsMock, getType(kNeedDataRequestId)).WillOnce(Return(m_type));
EXPECT_CALL(*m_activeRequestsMock, erase(kNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, kNeedDataRequestId));

ASSERT_TRUE(resendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, m_type))
.WillOnce(Return(true));
resendCallback();

mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource)));
EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true);
std::int32_t firstSourceId{mediaSource->getId()};

mainThreadWillEnqueueTaskAndWait();
EXPECT_EQ(m_mediaPipeline->removeSource(firstSourceId), true);

mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource)));
EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true);

constexpr auto kNextNeedDataRequestId = 1U;
mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(m_type));
EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, kNextNeedDataRequestId));
}

/**
* Test that RemoveSource fails if load has not been called (no gstreamer player).
*/
Expand Down
Loading