Skip to content

Commit

Permalink
Added exception handling for when the SRT stream has not started yet
Browse files Browse the repository at this point in the history
  • Loading branch information
dimiden committed Jan 21, 2025
1 parent e215d77 commit 3c5e038
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 19 deletions.
73 changes: 54 additions & 19 deletions src/projects/publishers/srt/srt_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,29 @@ namespace pub
_physical_port_list = std::move(physical_port_list);
}

_disconnect_timer.Push(
[this](void *parameter) {
if (_has_socket_list_to_disconnect)
{
decltype(_socket_list_to_disconnect) socket_list_to_disconnect;

{
std::lock_guard lock_guard(_socket_list_to_disconnect_mutex);
_has_socket_list_to_disconnect = false;
socket_list_to_disconnect = std::move(_socket_list_to_disconnect);
}

for (const auto &socket : socket_list_to_disconnect)
{
socket->Close();
}
}

return ov::DelayQueueAction::Repeat;
},
100);
_disconnect_timer.Start();

return Publisher::Start();
}

Expand All @@ -144,6 +167,9 @@ namespace pub
server_port->Close();
}

_disconnect_timer.Stop();
_disconnect_timer.Clear();

return Publisher::Stop();
}

Expand Down Expand Up @@ -172,6 +198,13 @@ namespace pub
return true;
}

void SrtPublisher::AddToDisconnect(const std::shared_ptr<ov::Socket> &remote)
{
std::lock_guard lock(_socket_list_to_disconnect_mutex);
_has_socket_list_to_disconnect = true;
_socket_list_to_disconnect.emplace_back(remote);
}

std::shared_ptr<ov::Url> SrtPublisher::GetStreamUrlForRemote(const std::shared_ptr<ov::Socket> &remote, bool *is_vhost_form)
{
// stream_id can be in the following format:
Expand Down Expand Up @@ -317,7 +350,7 @@ namespace pub

if (final_url == nullptr)
{
remote->Close();
AddToDisconnect(remote);
return;
}

Expand All @@ -336,12 +369,12 @@ namespace pub
{
case AccessController::VerificationResult::Error:
// will not reach here
remote->Close();
AddToDisconnect(remote);
return;

case AccessController::VerificationResult::Fail:
logtw("%s", signed_policy->GetErrMessage().CStr());
remote->Close();
AddToDisconnect(remote);
return;

case AccessController::VerificationResult::Off:
Expand All @@ -366,12 +399,12 @@ namespace pub
{
case AccessController::VerificationResult::Error:
logte("An error occurred while verifying with the AdmissionWebhooks: %s", final_url->ToUrlString().CStr());
remote->Close();
AddToDisconnect(remote);
return;

case AccessController::VerificationResult::Fail:
logtw("AdmissionWebhooks server returns an error: %s", admission_webhooks->GetErrReason().CStr());
remote->Close();
AddToDisconnect(remote);
return;

case AccessController::VerificationResult::Off:
Expand Down Expand Up @@ -405,21 +438,23 @@ namespace pub
if (application == nullptr)
{
logte("Could not find vhost/app: %s", vhost_app_name.CStr());
remote->Close();
AddToDisconnect(remote);
return;
}

auto stream = application->GetStreamAs<SrtStream>(final_url->Stream());

if (stream == nullptr)
{
stream = std::dynamic_pointer_cast<SrtStream>(PullStream(final_url, vhost_app_name, final_url->Host(), final_url->Stream()));
logte("Could not find stream: %s", final_url->Stream().CStr());
AddToDisconnect(remote);
return;
}

if(stream == nullptr)
if (stream->GetState() != Stream::State::STARTED)
{
logte("Could not find stream: %s", final_url->Stream().CStr());
remote->Close();
logtw("The stream is not started: %s/%u", stream->GetName().CStr(), stream->GetId());
AddToDisconnect(remote);
return;
}

Expand Down Expand Up @@ -461,6 +496,15 @@ namespace pub
// Nothing to do
}

std::shared_ptr<SrtPublisher::StreamMap> SrtPublisher::GetStreamMap(int port)
{
std::shared_lock lock(_stream_map_mutex);

auto item = _stream_map.find(port);

return (item == _stream_map.end()) ? nullptr : item->second;
}

std::shared_ptr<SrtSession> SrtPublisher::GetSession(const std::shared_ptr<ov::Socket> &remote)
{
const auto session_id = remote->GetNativeHandle();
Expand Down Expand Up @@ -505,13 +549,4 @@ namespace pub

logti("The SRT client has disconnected: [%s/%s], %s", stream->GetApplicationName(), stream->GetName().CStr(), remote->ToString().CStr());
}

std::shared_ptr<SrtPublisher::StreamMap> SrtPublisher::GetStreamMap(int port)
{
std::shared_lock lock(_stream_map_mutex);

auto item = _stream_map.find(port);

return (item == _stream_map.end()) ? nullptr : item->second;
}
} // namespace pub
10 changes: 10 additions & 0 deletions src/projects/publishers/srt/srt_publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ namespace pub
};

private:
void AddToDisconnect(const std::shared_ptr<ov::Socket> &remote);

std::shared_ptr<StreamMap> GetStreamMap(int port);
std::shared_ptr<SrtSession> GetSession(const std::shared_ptr<ov::Socket> &remote);

Expand All @@ -87,5 +89,13 @@ namespace pub

std::shared_mutex _session_map_mutex;
std::map<session_id_t, std::shared_ptr<SrtSession>> _session_map;

// When a request is made for a non-existent stream, the SRT socket connection is terminated.
// However, since the client immediately attempts to reconnect, the socket connection is now terminated with a slight delay
ov::DelayQueue _disconnect_timer{"SRTDiscnt"};
// To minimize the use of mutex, use atomic variables before using mutex
std::atomic<bool> _has_socket_list_to_disconnect;
std::mutex _socket_list_to_disconnect_mutex;
std::vector<std::shared_ptr<ov::Socket>> _socket_list_to_disconnect;
};
} // namespace pub

0 comments on commit 3c5e038

Please sign in to comment.