Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions media/server/main/include/MediaPipelineServerInternal.h
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,12 @@ class MediaPipelineServerInternal : public IMediaPipelineServerInternal, public
* @retval NeedMediaData timeout
*/
std::chrono::milliseconds getNeedMediaDataTimeout(MediaSourceType mediaSourceType) const;

private:
std::map<MediaSourceType, std::chrono::milliseconds> m_needMediaDataBackoff;

void updateNeedMediaDataDelay(MediaSourceType mediaSourceType, std::chrono::milliseconds delay);
void resetNeedMediaDataBackoff(MediaSourceType mediaSourceType);
};

}; // namespace firebolt::rialto::server
Expand Down
36 changes: 33 additions & 3 deletions media/server/main/source/MediaPipelineServerInternal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ bool MediaPipelineServerInternal::removeSourceInternal(int32_t id)
}

m_needMediaDataTimers.erase(sourceIter->first);
resetNeedMediaDataBackoff(sourceIter->first);
m_attachedSources.erase(sourceIter);
return true;
}
Expand Down Expand Up @@ -1152,6 +1153,7 @@ bool MediaPipelineServerInternal::flushInternal(int32_t sourceId, bool resetTime
m_gstPlayer->flush(sourceIter->first, resetTime, async);

m_needMediaDataTimers.erase(sourceIter->first);
resetNeedMediaDataBackoff(sourceIter->first);

// Reset Eos on flush
auto it = m_isMediaTypeEosMap.find(sourceIter->first);
Expand Down Expand Up @@ -1604,24 +1606,38 @@ void MediaPipelineServerInternal::scheduleNotifyNeedMediaData(MediaSourceType me
return;
}

std::chrono::milliseconds delay = getNeedMediaDataTimeout(mediaSourceType);

auto it = m_needMediaDataBackoff.find(mediaSourceType);
if (it != m_needMediaDataBackoff.end())
{
delay = it->second;
}

m_needMediaDataTimers[mediaSourceType] =
m_timerFactory
->createTimer(getNeedMediaDataTimeout(mediaSourceType),
[this, mediaSourceType]()
->createTimer(delay,
[this, mediaSourceType, delay]()
{
m_mainThread
->enqueueTask(m_mainThreadClientId,
[this, mediaSourceType]()
[this, mediaSourceType, delay]()
{
m_needMediaDataTimers.erase(mediaSourceType);
if (!notifyNeedMediaDataInternal(mediaSourceType))
{
updateNeedMediaDataDelay(mediaSourceType, delay);

RIALTO_SERVER_LOG_WARN("Scheduled Need media data sending "
"failed for: %s. Scheduling again...",
common::convertMediaSourceType(
mediaSourceType));
scheduleNotifyNeedMediaData(mediaSourceType);
}
else
{
resetNeedMediaDataBackoff(mediaSourceType);
}
});
});
}
Expand All @@ -1637,4 +1653,18 @@ std::chrono::milliseconds MediaPipelineServerInternal::getNeedMediaDataTimeout(M
}
return kDefaultNeedMediaDataResendTimeMs;
}

void MediaPipelineServerInternal::updateNeedMediaDataDelay(MediaSourceType mediaSourceType,
std::chrono::milliseconds delay)
{
constexpr std::chrono::milliseconds k_backoffMaxRetryDelay{500};
constexpr uint32_t k_backoffMultiplier{2};
auto nextDelay = std::min(delay * k_backoffMultiplier, k_backoffMaxRetryDelay);
Comment thread
smudri85 marked this conversation as resolved.
Outdated
m_needMediaDataBackoff[mediaSourceType] = nextDelay;
}

void MediaPipelineServerInternal::resetNeedMediaDataBackoff(MediaSourceType mediaSourceType)
{
m_needMediaDataBackoff.erase(mediaSourceType);
}
}; // namespace firebolt::rialto::server
58 changes: 58 additions & 0 deletions tests/unittests/media/server/main/mediaPipeline/FlushTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,61 @@ TEST_F(RialtoServerMediaPipelineFlushTest, FlushResetEos)
expectNotifyNeedData(firebolt::rialto::MediaSourceType::VIDEO, sourceId, 24);
m_gstPlayerCallback->notifyNeedMediaData(firebolt::rialto::MediaSourceType::VIDEO);
}

TEST_F(RialtoServerMediaPipelineFlushTest, FlushResetsNeedMediaDataBackoff)
{
constexpr auto kStatus = firebolt::rialto::MediaSourceStatus::ERROR;
constexpr auto kNeedDataRequestId = 0U;
const std::chrono::milliseconds kBackoffAfterFailure{30};
bool async{false};
std::function<void()> resendCallback;
std::unique_ptr<IMediaPipeline::MediaSource> mediaSource =
std::make_unique<IMediaPipeline::MediaSourceVideo>(m_kMimeType);

loadGstPlayer();

{
::testing::InSequence seq;

EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
resendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(kBackoffAfterFailure, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
}

mainThreadWillEnqueueTaskAndWait();
ASSERT_TRUE(m_activeRequestsMock);
EXPECT_CALL(*m_activeRequestsMock, getType(kNeedDataRequestId)).WillOnce(Return(m_kType));
EXPECT_CALL(*m_activeRequestsMock, erase(kNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, 24, kNeedDataRequestId));

ASSERT_TRUE(resendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, m_kType))
.WillOnce(Return(true));
resendCallback();

mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource)));
EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true);
std::int32_t sourceId{mediaSource->getId()};

mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_gstPlayerMock, flush(m_kType, m_kResetTime, async));
EXPECT_TRUE(m_mediaPipeline->flush(sourceId, m_kResetTime, async));

constexpr auto kNextNeedDataRequestId = 1U;
mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(m_kType));
EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, 24, kNextNeedDataRequestId));
}
89 changes: 89 additions & 0 deletions tests/unittests/media/server/main/mediaPipeline/HaveDataTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

using ::testing::A;
using ::testing::ByMove;
using ::testing::InSequence;
using ::testing::Ref;
using ::testing::ReturnRef;
using ::testing::Throw;
Expand Down Expand Up @@ -298,6 +299,94 @@ TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataSuccessWithR
resendCallback();
}

TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataResendBackoffDoublesOnFailureAndResetsAfterSuccess)
{
auto status = firebolt::rialto::MediaSourceStatus::ERROR;
auto mediaSourceType = firebolt::rialto::MediaSourceType::VIDEO;
std::function<void()> firstResendCallback;
std::function<void()> secondResendCallback;
std::function<void()> thirdResendCallback;
const std::chrono::milliseconds kFirstBackoff{30};
const std::chrono::milliseconds kSecondBackoff{60};

loadGstPlayer();

{
InSequence seq;

EXPECT_CALL(*m_timerFactoryMock, createTimer(m_kDefaultNeedMediaDataResendTimeout, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
firstResendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(kFirstBackoff, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
secondResendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(kSecondBackoff, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
thirdResendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(m_kDefaultNeedMediaDataResendTimeout, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
}

mainThreadWillEnqueueTaskAndWait();
ASSERT_TRUE(m_activeRequestsMock);
EXPECT_CALL(*m_activeRequestsMock, getType(m_kNeedDataRequestId)).WillOnce(Return(mediaSourceType));
EXPECT_CALL(*m_activeRequestsMock, erase(m_kNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(status, m_kNumFrames, m_kNeedDataRequestId));

ASSERT_TRUE(firstResendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(true));
firstResendCallback();

ASSERT_TRUE(secondResendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(true));
secondResendCallback();

const int kSourceId = attachSource(mediaSourceType, "video/h264");

ASSERT_TRUE(thirdResendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(true));
EXPECT_CALL(*m_sharedMemoryBufferMock,
getMaxDataLen(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(7 * 1024 * 1024));
EXPECT_CALL(*m_sharedMemoryBufferMock,
getDataOffset(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, mediaSourceType))
.WillOnce(Return(0));
EXPECT_CALL(*m_activeRequestsMock, insert(mediaSourceType, _)).WillOnce(Return(0));
EXPECT_CALL(*m_mediaPipelineClientMock,
notifyNeedMediaData(kSourceId, m_kNumFrames, 0, _)); // params tested in NeedMediaDataTests
thirdResendCallback();

const auto kNextNeedDataRequestId{m_kNeedDataRequestId + 1};
mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(mediaSourceType));
EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(status, m_kNumFrames, kNextNeedDataRequestId));
}

TEST_F(RialtoServerMediaPipelineHaveDataTest, ServerInternalHaveDataFailureDueToShmBufferError)
{
auto status = firebolt::rialto::MediaSourceStatus::OK;
Expand Down
60 changes: 60 additions & 0 deletions tests/unittests/media/server/main/mediaPipeline/SourceTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,66 @@ TEST_F(RialtoServerMediaPipelineSourceTest, RemoveSourceSuccess)
EXPECT_EQ(m_mediaPipeline->removeSource(sourceId), true);
}

TEST_F(RialtoServerMediaPipelineSourceTest, RemoveSourceResetsNeedMediaDataBackoff)
{
constexpr auto kStatus = firebolt::rialto::MediaSourceStatus::ERROR;
constexpr auto kNeedDataRequestId = 0U;
const std::chrono::milliseconds kBackoffAfterFailure{30};
std::function<void()> resendCallback;
std::unique_ptr<IMediaPipeline::MediaSource> mediaSource =
std::make_unique<IMediaPipeline::MediaSourceAudio>(m_kMimeType);

loadGstPlayer();

{
::testing::InSequence seq;

EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _))
.WillOnce(Invoke(
[&](const std::chrono::milliseconds &timeout, const std::function<void()> &callback,
firebolt::rialto::common::TimerType timerType)
{
resendCallback = callback;
return std::make_unique<::testing::NiceMock<TimerMock>>();
}));
EXPECT_CALL(*m_timerFactoryMock, createTimer(kBackoffAfterFailure, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
EXPECT_CALL(*m_timerFactoryMock, createTimer(std::chrono::milliseconds{15}, _, _))
.WillOnce(Return(ByMove(std::make_unique<::testing::NiceMock<TimerMock>>())));
}

mainThreadWillEnqueueTaskAndWait();
ASSERT_TRUE(m_activeRequestsMock);
EXPECT_CALL(*m_activeRequestsMock, getType(kNeedDataRequestId)).WillOnce(Return(m_type));
EXPECT_CALL(*m_activeRequestsMock, erase(kNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, kNeedDataRequestId));

ASSERT_TRUE(resendCallback);
mainThreadWillEnqueueTask();
EXPECT_CALL(*m_sharedMemoryBufferMock,
clearData(ISharedMemoryBuffer::MediaPlaybackType::GENERIC, m_kSessionId, m_type))
.WillOnce(Return(true));
resendCallback();

mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource)));
EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true);
std::int32_t firstSourceId{mediaSource->getId()};

mainThreadWillEnqueueTaskAndWait();
EXPECT_EQ(m_mediaPipeline->removeSource(firstSourceId), true);

mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_gstPlayerMock, attachSource(Ref(mediaSource)));
EXPECT_EQ(m_mediaPipeline->attachSource(mediaSource), true);

constexpr auto kNextNeedDataRequestId = 1U;
mainThreadWillEnqueueTaskAndWait();
EXPECT_CALL(*m_activeRequestsMock, getType(kNextNeedDataRequestId)).WillOnce(Return(m_type));
EXPECT_CALL(*m_activeRequestsMock, erase(kNextNeedDataRequestId));
EXPECT_TRUE(m_mediaPipeline->haveData(kStatus, kNextNeedDataRequestId));
}

/**
* Test that RemoveSource fails if load has not been called (no gstreamer player).
*/
Expand Down
Loading