Skip to content

Commit 6d20bc8

Browse files
committed
Refactored Push Module
1 parent 5b3772c commit 6d20bc8

File tree

4 files changed

+156
-78
lines changed

4 files changed

+156
-78
lines changed

src/projects/modules/ffmpeg/ffmpeg_writer.cpp

Lines changed: 84 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ namespace ffmpeg
1717
}
1818

1919
Writer::Writer()
20-
: _state(WriterStateNone),
21-
_av_format(nullptr)
20+
: _state(WriterStateNone)
2221
{
2322
}
2423

@@ -75,8 +74,6 @@ namespace ffmpeg
7574

7675
bool Writer::SetUrl(const ov::String url, const ov::String format)
7776
{
78-
std::unique_lock<std::mutex> mlock(_lock);
79-
8077
if (!url || url.IsEmpty() == true)
8178
{
8279
logte("Destination url is empty");
@@ -87,14 +84,17 @@ namespace ffmpeg
8784
_format = format;
8885

8986
// Create output context
90-
int error = avformat_alloc_output_context2(&_av_format, nullptr, (_format != nullptr) ? _format.CStr() : nullptr, _url.CStr());
87+
AVFormatContext *av_format = nullptr;
88+
int error = avformat_alloc_output_context2(&av_format, nullptr, (_format != nullptr) ? _format.CStr() : nullptr, _url.CStr());
9189
if (error < 0)
9290
{
9391
logte("Could not create output context. error(%s), url(%s)", ffmpeg::Conv::AVErrorToString(error).CStr(), _url.CStr());
9492

9593
return false;
9694
}
9795

96+
SetAVFormatContext(av_format);
97+
9898
return true;
9999
}
100100

@@ -113,18 +113,23 @@ namespace ffmpeg
113113
return _timestamp_mode;
114114
}
115115

116-
bool Writer::AddTrack(std::shared_ptr<MediaTrack> media_track)
116+
bool Writer::AddTrack(const std::shared_ptr<MediaTrack> &media_track)
117117
{
118-
std::unique_lock<std::mutex> mlock(_lock);
119-
120118
if (media_track->GetCodecId() == cmn::MediaCodecId::Opus &&
121119
media_track->GetDecoderConfigurationRecord() == nullptr)
122120
{
123121
auto opus_config { std::make_shared<OpusSpecificConfig>(media_track->GetChannel().GetCounts(), media_track->GetSampleRate()) };
124122
media_track->SetDecoderConfigurationRecord(opus_config);
125123
}
126124

127-
AVStream *av_stream = avformat_new_stream(_av_format, nullptr);
125+
auto av_format = GetAVFormatContext();
126+
if (av_format == nullptr)
127+
{
128+
logte("AVFormatContext is null");
129+
return false;
130+
}
131+
132+
AVStream *av_stream = avformat_new_stream(av_format.get(), nullptr);
128133
if (!av_stream)
129134
{
130135
logte("Could not allocate stream");
@@ -140,36 +145,46 @@ namespace ffmpeg
140145
return false;
141146
}
142147

143-
// MediaTrackID -> AVStream, MediaTrack
144-
_track_map[media_track->GetId()] = std::make_pair(av_stream, media_track);
148+
{
149+
std::lock_guard<std::shared_mutex> mlock(_track_map_lock);
150+
// MediaTrackID -> AVStream, MediaTrack
151+
// AVStream doesn't need to be released. It will be released when AVFormatContext is released.
152+
std::shared_ptr<AVStream> av_stream_ptr(av_stream);
153+
_track_map[media_track->GetId()] = std::make_pair(av_stream_ptr, media_track);
154+
}
145155

146156
return true;
147157
}
148158

149159
bool Writer::Start()
150160
{
151-
std::unique_lock<std::mutex> mlock(_lock);
152-
153161
SetState(WriterStateConnecting);
154162

155163
_start_time = -1LL;
156164
_need_to_flush = false;
157165
_need_to_close = false;
158166

159-
_av_format->flush_packets = 1;
167+
auto av_format = GetAVFormatContext();
168+
if (av_format == nullptr)
169+
{
170+
logte("AVFormatContext is null");
171+
return false;
172+
}
173+
174+
av_format->flush_packets = 1;
160175

161176
// Set Interrupt Callback
162177
_interrupt_cb = {InterruptCallback, this};
163178

164-
if (!(_av_format->oformat->flags & AVFMT_NOFILE))
179+
if (!(av_format->oformat->flags & AVFMT_NOFILE))
165180
{
166181
_last_packet_sent_time = std::chrono::high_resolution_clock::now();
167-
int error = avio_open2(&_av_format->pb, _av_format->url, AVIO_FLAG_WRITE, &_interrupt_cb, nullptr);
182+
int error = avio_open2(&av_format->pb, av_format->url, AVIO_FLAG_WRITE, &_interrupt_cb, nullptr);
168183
if (error < 0)
169184
{
170185
SetState(WriterStateError);
171186

172-
logte("Error opening file. error(%s), url(%s)", ffmpeg::Conv::AVErrorToString(error).CStr(), _av_format->url);
187+
logte("Error opening file. error(%s), url(%s)", ffmpeg::Conv::AVErrorToString(error).CStr(), av_format->url);
173188

174189
return false;
175190
}
@@ -183,7 +198,7 @@ namespace ffmpeg
183198
AVDictionary *format_options = nullptr;
184199
av_dict_set_int(&format_options, "use_editlist", 0, 0);
185200

186-
if (avformat_write_header(_av_format, &format_options) < 0)
201+
if (avformat_write_header(av_format.get(), &format_options) < 0)
187202
{
188203
SetState(WriterStateError);
189204

@@ -193,7 +208,7 @@ namespace ffmpeg
193208
}
194209

195210
// Set output format name
196-
_output_format_name = _av_format->oformat->name;
211+
_output_format_name = av_format->oformat->name;
197212

198213
_need_to_flush = true;
199214

@@ -204,48 +219,47 @@ namespace ffmpeg
204219

205220
bool Writer::Stop()
206221
{
207-
std::unique_lock<std::mutex> mlock(_lock);
208-
209-
if (_av_format)
222+
auto av_format = GetAVFormatContext();
223+
if (av_format)
210224
{
225+
auto av_format_ptr = av_format.get();
211226
// Write trailer
212227
if (_need_to_flush)
213228
{
214-
av_write_trailer(_av_format);
229+
av_write_trailer(av_format_ptr);
215230
}
216231

217232
// Close file
218233
if (_need_to_close)
219234
{
220-
avformat_close_input(&_av_format);
235+
avformat_close_input(&av_format_ptr);
221236
}
222237

223238
// Free context
224-
avformat_free_context(_av_format);
225-
_av_format = nullptr;
239+
avformat_free_context(av_format_ptr);
240+
241+
ReleaseAVFormatContext();
226242
}
227243

228244
SetState(WriterStateClosed);
229245

230246
return true;
231247
}
232248

233-
bool Writer::SendPacket(std::shared_ptr<MediaPacket> packet)
249+
bool Writer::SendPacket(const std::shared_ptr<MediaPacket> &packet)
234250
{
235251
if (!packet)
236252
{
237253
return false;
238254
}
239255

240256
// Find MediaTrack and AVSTream from MediaTrackID
241-
auto it = _track_map.find(packet->GetTrackId());
242-
if (it == _track_map.end())
257+
auto [av_stream, media_track] = GetTrack(packet->GetTrackId());
258+
if (av_stream == nullptr || media_track == nullptr)
243259
{
244-
// If there is no track in the map, the packet is dropped. this is not an error.
245-
return true;
260+
logtw("Could not find track. track_id:%d", packet->GetTrackId());
261+
return false;
246262
}
247-
auto av_stream = it->second.first;
248-
auto media_track = it->second.second;
249263

250264
// Start Timestamp
251265
if (_start_time == -1LL)
@@ -368,8 +382,8 @@ namespace ffmpeg
368382
// Passthrough
369383
}
370384

371-
std::unique_lock<std::mutex> mlock(_lock);
372-
if (!_av_format)
385+
auto av_format = GetAVFormatContext();
386+
if (!av_format)
373387
{
374388
av_packet_unref(&av_packet);
375389
SetState(WriterStateError);
@@ -378,7 +392,7 @@ namespace ffmpeg
378392

379393
_last_packet_sent_time = std::chrono::high_resolution_clock::now();
380394

381-
int error = av_interleaved_write_frame(_av_format, &av_packet);
395+
int error = av_interleaved_write_frame(av_format.get(), &av_packet);
382396
if (error != 0)
383397
{
384398
SetState(WriterStateError);
@@ -395,4 +409,38 @@ namespace ffmpeg
395409
{
396410
return _last_packet_sent_time;
397411
}
412+
413+
std::shared_ptr<AVFormatContext> Writer::GetAVFormatContext() const
414+
{
415+
std::shared_lock<std::shared_mutex> mlock(_av_format_lock);
416+
return _av_format_ptr;
417+
}
418+
419+
void Writer::SetAVFormatContext(AVFormatContext *av_format)
420+
{
421+
std::lock_guard<std::shared_mutex> mlock(_av_format_lock);
422+
_av_format_ptr.reset(av_format);
423+
}
424+
425+
void Writer::ReleaseAVFormatContext()
426+
{
427+
std::lock_guard<std::shared_mutex> mlock(_av_format_lock);
428+
if (_av_format_ptr)
429+
{
430+
avformat_free_context(_av_format_ptr.get());
431+
_av_format_ptr.reset();
432+
;
433+
}
434+
}
435+
436+
std::pair<std::shared_ptr<AVStream>, std::shared_ptr<MediaTrack>> Writer::GetTrack(int32_t track_id) const
437+
{
438+
std::shared_lock<std::shared_mutex> mlock(_track_map_lock);
439+
auto it = _track_map.find(track_id);
440+
if (it == _track_map.end())
441+
{
442+
return std::make_pair(nullptr, nullptr);
443+
}
444+
return it->second;
445+
}
398446
} // namespace ffmpeg

src/projects/modules/ffmpeg/ffmpeg_writer.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ namespace ffmpeg
4747
bool Start();
4848
bool Stop();
4949

50-
bool AddTrack(std::shared_ptr<MediaTrack> media_track);
51-
bool SendPacket(std::shared_ptr<MediaPacket> packet);
50+
bool AddTrack(const std::shared_ptr<MediaTrack> &media_track);
51+
bool SendPacket(const std::shared_ptr<MediaPacket> &packet);
5252
std::chrono::high_resolution_clock::time_point GetLastPacketSentTime();
5353

5454
void SetTimestampMode(TimestampMode mode);
@@ -58,6 +58,11 @@ namespace ffmpeg
5858
WriterState GetState();
5959

6060
private:
61+
std::shared_ptr<AVFormatContext> GetAVFormatContext() const;
62+
void SetAVFormatContext(AVFormatContext* av_format);
63+
void ReleaseAVFormatContext();
64+
std::pair<std::shared_ptr<AVStream>, std::shared_ptr<MediaTrack>> GetTrack(int32_t track_id) const;
65+
6166
WriterState _state;
6267

6368
ov::String _url;
@@ -70,15 +75,16 @@ namespace ffmpeg
7075
bool _need_to_close = false;
7176

7277
// MediaTrackId -> AVStream, MediaTrack
73-
std::map<int32_t, std::pair<AVStream*, std::shared_ptr<MediaTrack>>> _track_map;
78+
std::map<int32_t, std::pair<std::shared_ptr<AVStream>, std::shared_ptr<MediaTrack>>> _track_map;
79+
mutable std::shared_mutex _track_map_lock;
80+
81+
std::shared_ptr<AVFormatContext> _av_format_ptr = nullptr;
82+
mutable std::shared_mutex _av_format_lock;
7483

75-
AVFormatContext* _av_format = nullptr;
7684
ov::String _output_format_name;
7785
AVIOInterruptCB _interrupt_cb;
7886
std::chrono::high_resolution_clock::time_point _last_packet_sent_time;
7987
int32_t _connection_timeout = 5000; // 5s
8088
int32_t _send_timeout = 1000; // 1s
81-
82-
std::mutex _lock;
8389
};
8490
} // namespace ffmpeg

0 commit comments

Comments
 (0)