Skip to content

Commit cef9fb9

Browse files
committed
Refactored Async Reader
1 parent 39cfa16 commit cef9fb9

File tree

2 files changed

+33
-43
lines changed

2 files changed

+33
-43
lines changed

ntgcalls/io/base_reader.cpp

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,45 +6,38 @@
66
#include "ntgcalls/exceptions.hpp"
77

88
namespace ntgcalls {
9-
BaseReader::BaseReader(const int64_t bufferSize, const bool noLatency): noLatency(noLatency) {
10-
size = bufferSize;
11-
}
9+
BaseReader::BaseReader(const int64_t bufferSize, const bool noLatency): noLatency(noLatency), size(bufferSize) {}
1210

1311
BaseReader::~BaseReader() {
1412
BaseReader::close();
15-
std::lock_guard lock(mutex);
16-
promise = nullptr;
1713
readChunks = 0;
18-
buffer.clear();
1914
}
2015

2116
void BaseReader::start() {
22-
if (!noLatency) thread = std::thread(&BaseReader::readAsync, this);
23-
}
24-
25-
void BaseReader::readAsync() {
26-
do {
27-
std::this_thread::sleep_for(std::chrono::milliseconds(5));
28-
std::unique_lock lock(mutex);
29-
if (buffer.size() < 10 && !_eof) {
30-
const auto availableSpace = 10 - buffer.size();
31-
try {
17+
if (!noLatency) {
18+
thread = std::thread([this] {
19+
do {
20+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
21+
std::unique_lock lock(mutex);
22+
const auto availableSpace = 10 - buffer.size();
23+
lock.unlock();
3224
for (int i = 0; i < availableSpace; i++) {
33-
if (auto tmp = this->readInternal(size); tmp) {
34-
buffer.push_back(tmp);
25+
try {
26+
if (auto tmp = this->readInternal(size); tmp) {
27+
lock.lock();
28+
buffer.push(std::move(tmp));
29+
lock.unlock();
30+
}
31+
} catch (...) {
32+
lock.lock();
33+
_eof = true;
34+
lock.unlock();
3535
}
3636
}
37-
} catch (...) {
38-
_eof = true;
39-
}
40-
}
41-
if (!currentBuffer) {
42-
currentBuffer = buffer[0];
43-
buffer.erase(buffer.begin());
44-
lock.unlock();
45-
bufferCondition.notify_one();
46-
}
47-
} while (!quit);
37+
bufferCondition.notify_one();
38+
} while (!quit && !_eof);
39+
});
40+
}
4841
}
4942

5043
wrtc::binary BaseReader::read() {
@@ -60,13 +53,15 @@ namespace ntgcalls {
6053
return nullptr;
6154
}
6255
std::unique_lock lock(mutex);
63-
wrtc::binary res = nullptr;
64-
bufferCondition.wait(lock, [this]{
65-
return currentBuffer || quit || _eof;
56+
bufferCondition.wait(lock, [this] {
57+
return !buffer.empty() || quit || _eof;
6658
});
67-
res = currentBuffer;
68-
currentBuffer = nullptr;
69-
return res;
59+
if (buffer.empty()) {
60+
return nullptr;
61+
}
62+
auto data = std::move(buffer.front());
63+
buffer.pop();
64+
return data;
7065
}
7166

7267
void BaseReader::close() {

ntgcalls/io/base_reader.hpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,19 @@
66

77

88
#include <shared_mutex>
9-
#include <vector>
109

1110
#include <wrtc/wrtc.hpp>
1211
#include "../utils/dispatch_queue.hpp"
1312

1413
namespace ntgcalls {
1514
class BaseReader {
16-
std::vector<wrtc::binary> buffer;
17-
wrtc::binary currentBuffer;
15+
std::queue<wrtc::binary> buffer;
16+
std::mutex mutex;
1817
std::condition_variable bufferCondition;
19-
std::atomic_bool _eof = false, running = false, noLatency = false, quit = false;
18+
std::atomic_bool _eof = false, noLatency = false, quit = false;
2019
std::thread thread;
21-
std::mutex mutex;
22-
std::shared_ptr<std::promise<void>> promise;
2320
int64_t size = 0;
2421

25-
void readAsync();
26-
2722
protected:
2823
int64_t readChunks = 0;
2924

0 commit comments

Comments
 (0)