Skip to content

Commit

Permalink
Optimized Streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Laky-64 committed Feb 29, 2024
1 parent cef9fb9 commit 589971d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 38 deletions.
56 changes: 22 additions & 34 deletions ntgcalls/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ namespace ntgcalls {
Stream::Stream() {
audio = std::make_shared<AudioStreamer>();
video = std::make_shared<VideoStreamer>();
streamQueue = std::make_shared<DispatchQueue>(5);
updateQueue = std::make_shared<DispatchQueue>();
}

Stream::~Stream() {
stop();
streamQueue = nullptr;
updateQueue = nullptr;

std::lock_guard lock(mutex);
Expand Down Expand Up @@ -71,29 +69,6 @@ namespace ntgcalls {
}
}

void Stream::sendSample() {
std::shared_lock lock(mutex);
if (running) {
if (idling || !reader || !(reader->audio || reader->video)) {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
lock.lock();
} else {
if (auto [fst, snd] = unsafePrepareForSample(lock); fst && snd) {
if (const auto sample = snd->read()) {
fst->sendData(sample);
}
}
checkStream();
}
if (streamQueue) {
streamQueue->dispatch([&] {
sendSample();
});
}
}
}

void Stream::setAVStream(const MediaDescription& streamConfig, const bool noUpgrade) {
std::lock_guard lock(mutex);
const auto audioConfig = streamConfig.audio;
Expand Down Expand Up @@ -156,20 +131,30 @@ namespace ntgcalls {

Stream::Status Stream::status() {
std::shared_lock lock(mutex);
if (reader && (reader->audio || reader->video) && running) {
if (reader && (reader->audio || reader->video)) {
return idling ? Paused : Playing;
}
return Idling;
}

void Stream::start() {
std::lock_guard lock(mutex);
if (!running) {
running = true;
streamQueue->dispatch([&] {
sendSample();
});
}
thread = std::thread([this] {
do {
std::shared_lock lock(mutex);
if (idling || !reader || !(reader->audio || reader->video)) {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
lock.lock();
} else {
if (auto [fst, snd] = unsafePrepareForSample(lock); fst && snd) {
if (const auto sample = snd->read()) {
fst->sendData(sample);
}
}
checkStream();
}
} while (!quit);
});
}

bool Stream::pause() {
Expand Down Expand Up @@ -212,8 +197,11 @@ namespace ntgcalls {
}

void Stream::stop() {
quit = true;
if (thread.joinable()) {
thread.join();
}
std::lock_guard lock(mutex);
running = false;
idling = false;
if (reader) {
if (reader->audio) {
Expand Down
7 changes: 3 additions & 4 deletions ntgcalls/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,14 @@ namespace ntgcalls {
std::shared_ptr<VideoStreamer> video;
wrtc::MediaStreamTrack *audioTrack{}, *videoTrack{};
std::shared_ptr<MediaReaderFactory> reader;
bool running = false, idling = false, hasVideo = false;
bool idling = false;
std::atomic_bool hasVideo = false, quit = false;
wrtc::synchronized_callback<Type> onEOF;
wrtc::synchronized_callback<MediaState> onChangeStatus;
std::shared_ptr<DispatchQueue> streamQueue;
std::thread thread;
std::shared_ptr<DispatchQueue> updateQueue;
std::shared_mutex mutex;

void sendSample();

void checkStream() const;

std::pair<std::shared_ptr<BaseStreamer>, std::shared_ptr<BaseReader>> unsafePrepareForSample(std::shared_lock<std::shared_mutex>& lock) const;
Expand Down

0 comments on commit 589971d

Please sign in to comment.