Skip to content

Commit

Permalink
Added playlist support and PullStream() to SRT publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
dimiden committed Jan 18, 2025
1 parent 09f90dd commit 8700678
Show file tree
Hide file tree
Showing 8 changed files with 655 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ namespace cfg
{
struct SrtPublisher : public Publisher
{
public:
PublisherType GetType() const override
{
return PublisherType::Srt;
}
};
} // namespace pub
} // namespace app
} // namespace vhost
} // namespace app
} // namespace vhost
} // namespace cfg
144 changes: 144 additions & 0 deletions src/projects/publishers/srt/srt_playlist.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//==============================================================================
//
// OvenMediaEngine
//
// Created by Hyunjun Jang
// Copyright (c) 2025 AirenSoft. All rights reserved.
//
//==============================================================================
#include "srt_playlist.h"

#include <base/info/playlist.h>
#include <base/info/stream.h>
#include <base/ovsocket/socket.h>

#include "srt_private.h"

#define SRT_STREAM_DESC \
_stream_info->GetApplicationName(), _stream_info->GetName().CStr(), _stream_info->GetId()

#define logap(format, ...) logtp("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__)
#define logad(format, ...) logtd("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__)
#define logas(format, ...) logts("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__)

#define logai(format, ...) logti("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__)
#define logaw(format, ...) logtw("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__)
#define logae(format, ...) logte("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__)
#define logac(format, ...) logtc("[%s/%s(%u)] " format, SRT_STREAM_DESC, ##__VA_ARGS__)

namespace pub
{
SrtPlaylist::SrtPlaylist(
const std::shared_ptr<const info::Stream> &stream_info,
const std::shared_ptr<const info::Playlist> &playlist_info,
const std::shared_ptr<SrtPlaylistSink> &sink)
: _stream_info(stream_info),
_playlist_info(playlist_info),
_sink(sink)
{
_packetizer = std::make_shared<mpegts::Packetizer>();
}

void SrtPlaylist::AddTrack(const std::shared_ptr<MediaTrack> &track)
{
// Duplicated tracks will be ignored by the packetizer
_packetizer->AddTrack(track);
_track_map[track->GetId()] = track;
}

void SrtPlaylist::AddTracks(const std::vector<std::shared_ptr<MediaTrack>> &tracks)
{
for (const auto &track : tracks)
{
// Duplicated tracks will be ignored by the packetizer
_packetizer->AddTrack(track);
_track_map[track->GetId()] = track;
}
}

bool SrtPlaylist::Start()
{
_packetizer->AddSink(GetSharedPtrAs<mpegts::PacketizerSink>());

return _packetizer->Start();
}

bool SrtPlaylist::Stop()
{
return _packetizer->Stop();
}

void SrtPlaylist::EnqueuePacket(const std::shared_ptr<MediaPacket> &media_packet)
{
#if DEBUG
if (_track_map.find(media_packet->GetTrackId()) == _track_map.end())
{
logte("The track is not found in the playlist map");
OV_ASSERT2(false);
}
#endif // DEBUG

_packetizer->AppendFrame(media_packet);
}

void SrtPlaylist::SendData(const std::vector<std::shared_ptr<mpegts::Packet>> &packets)
{
if (_sink == nullptr)
{
return;
}

auto self = GetSharedPtrAs<SrtPlaylist>();

for (auto &packet : packets)
{
auto size = _data_to_send->GetLength();
const auto &data = packet->GetData();

// Broadcast if the data size exceeds the SRT's payload length
if ((size + data->GetLength()) > SRT_LIVE_DEF_PLSIZE)
{
_sink->OnSrtPlaylistData(self, _data_to_send);
_data_to_send = data->Clone();
}
else
{
_data_to_send->Append(packet->GetData());
}
}
}

void SrtPlaylist::OnPsi(const std::vector<std::shared_ptr<const MediaTrack>> &tracks, const std::vector<std::shared_ptr<mpegts::Packet>> &psi_packets)
{
std::shared_ptr<ov::Data> psi_data = std::make_shared<ov::Data>();

// Concatenate PSI packets
for (const auto &packet : psi_packets)
{
psi_data->Append(packet->GetData());
}

logap("OnPsi - %zu packets (total %zu bytes)", psi_packets.size(), psi_data->GetLength());

_psi_data = std::move(psi_data);

SendData(psi_packets);
}

void SrtPlaylist::OnFrame(const std::shared_ptr<const MediaPacket> &media_packet, const std::vector<std::shared_ptr<mpegts::Packet>> &pes_packets)
{
#if DEBUG
// Since adding up the total packet size is costly, it is calculated only in debug mode
size_t total_packet_size = 0;

for (const auto &packet : pes_packets)
{
total_packet_size += packet->GetData()->GetLength();
}

logap("OnFrame - %zu packets (total %zu bytes)", pes_packets.size(), total_packet_size);
#endif // DEBUG

SendData(pes_packets);
}
} // namespace pub
97 changes: 97 additions & 0 deletions src/projects/publishers/srt/srt_playlist.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//==============================================================================
//
// OvenMediaEngine
//
// Created by Hyunjun Jang
// Copyright (c) 2025 AirenSoft. All rights reserved.
//
//==============================================================================
#pragma once

#include <base/info/media_track_group.h>
#include <base/info/playlist.h>
#include <base/info/stream.h>
#include <base/mediarouter/media_buffer.h>
#include <base/ovlibrary/ovlibrary.h>
#include <modules/containers/mpegts/mpegts_packetizer.h>

namespace pub
{
class SrtPlaylist;

class SrtPlaylistSink
{
public:
virtual ~SrtPlaylistSink() = default;

virtual void OnSrtPlaylistData(
const std::shared_ptr<SrtPlaylist> &playlist,
const std::shared_ptr<const ov::Data> &data) = 0;
};

struct SrtData
{
SrtData(
const std::shared_ptr<SrtPlaylist> &playlist,
const std::shared_ptr<const ov::Data> &data)
: playlist(playlist),
data(data)
{
}

// The playlist that this data belongs to
std::shared_ptr<SrtPlaylist> playlist;

// The data to send
std::shared_ptr<const ov::Data> data;
};

// SrtPlaylist IS NOT thread safe, so it should be used with a lock if needed
class SrtPlaylist : public mpegts::PacketizerSink, public ov::EnableSharedFromThis<SrtPlaylistSink>
{
public:
SrtPlaylist(
const std::shared_ptr<const info::Stream> &stream_info,
const std::shared_ptr<const info::Playlist> &playlist_info,
const std::shared_ptr<SrtPlaylistSink> &sink);

void AddTrack(const std::shared_ptr<MediaTrack> &track);
void AddTracks(const std::vector<std::shared_ptr<MediaTrack>> &tracks);

bool Start();
bool Stop();

void EnqueuePacket(const std::shared_ptr<MediaPacket> &media_packet);

//--------------------------------------------------------------------
// Implementation of mpegts::PacketizerSink
//--------------------------------------------------------------------
// Do not need to lock _packetizer_mutex inside OnPsi() because it will be called only once when the packetizer starts
void OnPsi(const std::vector<std::shared_ptr<const MediaTrack>> &tracks, const std::vector<std::shared_ptr<mpegts::Packet>> &psi_packets) override;
// Do not need to lock _packetizer_mutex inside OnFrame() because it's called after acquiring the lock in EnqueuePacket()
// (It's called in the thread that calls EnqueuePacket())
void OnFrame(const std::shared_ptr<const MediaPacket> &media_packet, const std::vector<std::shared_ptr<mpegts::Packet>> &pes_packets) override;
//--------------------------------------------------------------------

const std::shared_ptr<const ov::Data> &GetPsiData() const
{
return _psi_data;
}

private:
void SendData(const std::vector<std::shared_ptr<mpegts::Packet>> &packets);

private:
std::shared_ptr<const info::Stream> _stream_info;
std::shared_ptr<const info::Playlist> _playlist_info;

std::map<int32_t, std::shared_ptr<MediaTrack>> _track_map;

std::shared_ptr<mpegts::Packetizer> _packetizer;

std::shared_ptr<SrtPlaylistSink> _sink;

std::shared_ptr<const ov::Data> _psi_data;
std::shared_ptr<ov::Data> _data_to_send = std::make_shared<ov::Data>();
};
} // namespace pub
45 changes: 30 additions & 15 deletions src/projects/publishers/srt/srt_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,17 @@ namespace pub
}
else
{
// {vhost}/{app}/{stream} format
// {vhost}/{app}/{stream}[/{playlist}] format
auto parts = stream_path.Split("/");
auto part_count = parts.size();

if (parts.size() != 3)
if ((part_count != 3) && (part_count != 4))
{
logte("The streamid for SRT must be in the following format: {vhost}/{app}/{stream}, but [%s]", stream_path.CStr());
logte("The streamid for SRT must be in the following format: {vhost}/{app}/{stream}[/{playlist}], but [%s]", stream_path.CStr());
return nullptr;
}

// Convert to srt://{vhost}/{app}/{stream}
// Convert to srt://{vhost}/{app}/{stream}[/{playlist}]
stream_path.Prepend("srt://");

is_vhost = true;
Expand All @@ -298,14 +299,8 @@ namespace pub
}
else
{
if (streamid.IsEmpty())
{
logte("The streamid for SRT must be in one of the following formats: srt://{host}[:{port}]/{app}/{stream}[?{query}={value}] or {vhost}/{app}/{stream}, but [%s]", stream_path.CStr());
}
else
{
logte("The streamid for SRT must be in one of the following formats: srt://{host}[:{port}]/{app}/{stream}[?{query}={value}] or {vhost}/{app}/{stream}, but [%s] (streamid: [%s])", stream_path.CStr(), streamid.CStr());
}
auto extra_log = streamid.IsEmpty() ? "" : ov::String::FormatString(" (streamid: [%s])", streamid.CStr());
logte("The streamid for SRT must be in one of the following formats: srt://{host}[:{port}]/{app}/{stream}[/{playlist}][?{query}={value}] or {vhost}/{app}/{stream}[/{playlist}], but [%s]%s", stream_path.CStr(), extra_log.CStr());
}

return final_url;
Expand Down Expand Up @@ -414,18 +409,38 @@ namespace pub
return;
}

auto stream = application->GetStream(final_url->Stream());
auto stream = application->GetStreamAs<SrtStream>(final_url->Stream());

if (stream == nullptr)
{
stream = std::dynamic_pointer_cast<SrtStream>(PullStream(final_url, vhost_app_name, final_url->Host(), final_url->Stream()));
}

if(stream == nullptr)
{
logte("Could not find stream: %s", final_url->Stream().CStr());
remote->Close();
return;
}

auto playlist_name = final_url->File();
std::shared_ptr<SrtPlaylist> srt_playlist = nullptr;

if (playlist_name.IsEmpty())
{
playlist_name = DEFAULT_SRT_PLAYLIST_NAME;
}

::srt_setrejectreason(remote->GetNativeHandle(), 1404);
srt_playlist = stream->GetSrtPlaylist(playlist_name);

if (srt_playlist == nullptr)
{
logte("Could not find playlist: %s", final_url->File().CStr());
remote->Close();
return;
}

auto session = SrtSession::Create(application, stream, remote->GetNativeHandle(), remote);
auto session = SrtSession::Create(application, stream, remote->GetNativeHandle(), remote, srt_playlist);

{
std::unique_lock lock(_session_map_mutex);
Expand Down
Loading

0 comments on commit 8700678

Please sign in to comment.