From ea24f6930a46acef05e0216b895818bd565973cd Mon Sep 17 00:00:00 2001 From: getroot Date: Fri, 20 Sep 2024 00:50:56 +0900 Subject: [PATCH] Fixed memory leak posibility in HLSv3 --- .../containers/mpegts/mpegts_packager.cpp | 69 +++++++++------ .../containers/mpegts/mpegts_packager.h | 88 ++++++++++--------- .../publishers/hls/hls_media_playlist.cpp | 8 +- .../publishers/hls/hls_media_playlist.h | 2 +- 4 files changed, 93 insertions(+), 74 deletions(-) diff --git a/src/projects/modules/containers/mpegts/mpegts_packager.cpp b/src/projects/modules/containers/mpegts/mpegts_packager.cpp index c0695e237..c8a0e2fe7 100644 --- a/src/projects/modules/containers/mpegts/mpegts_packager.cpp +++ b/src/projects/modules/containers/mpegts/mpegts_packager.cpp @@ -47,7 +47,7 @@ namespace mpegts return true; } - uint32_t Packager::GetNextSegmentId() + uint64_t Packager::GetNextSegmentId() { return _last_segment_id ++; } @@ -132,11 +132,11 @@ namespace mpegts } if (track_id == _main_track_id && - sample_buffer->GetCurrentDurationMs() >= _config.target_duration_ms) + sample_buffer->GetCurrentDurationUs() >= _config.target_duration_ms * 1000) { if (media_packet->GetMediaType() == cmn::MediaType::Video && media_packet->IsKeyFrame()) { - sample_buffer->MarkSegmentBoundary(); + sample_buffer->MarkSegmentBoundary(); } else if (media_packet->GetMediaType() == cmn::MediaType::Audio) { @@ -162,7 +162,7 @@ namespace mpegts CreateSegmentIfReady(true); } - std::shared_ptr Packager::GetSegment(uint32_t segment_id) const + std::shared_ptr Packager::GetSegment(uint64_t segment_id) const { { std::shared_lock lock(_segments_guard); @@ -194,7 +194,7 @@ namespace mpegts return nullptr; } - std::shared_ptr Packager::GetSegmentData(uint32_t segment_id) const + std::shared_ptr Packager::GetSegmentData(uint64_t segment_id) const { auto segment = GetSegment(segment_id); if (segment == nullptr) @@ -211,17 +211,24 @@ namespace mpegts auto main_sample_buffer = GetSampleBuffer(_main_track_id); if (main_sample_buffer == nullptr) { - logtc("SampleBuffer is not found for main track_id %u", _main_track_id); + logtc("Stream(%s) SampleBuffer is not found for main track_id %u", _config.stream_id_meta.CStr(), _main_track_id); return; } if (main_sample_buffer->HasSegmentBoundary() == false) { - return; + if (main_sample_buffer->GetCurrentDurationUs() >= MAX_SEGMENT_DURATION_US) + { + logtw("Stream (%s) Main track (%u) does not have a segment boundary. However, the queued segment duration is too long: %u ms. A new segment will be forcibly created.", _config.stream_id_meta.CStr(), _main_track_id, main_sample_buffer->GetCurrentDurationUs()); + + Flush(); + } + + return; } - auto main_segment_duration_ms = main_sample_buffer->GetDurationUntilSegmentBoundaryMs(); - auto total_main_segment_duration_ms = main_sample_buffer->GetTotalConsumedDurationMs() + main_segment_duration_ms; + auto main_segment_duration_us = main_sample_buffer->GetDurationUntilSegmentBoundaryUs(); + uint64_t total_main_segment_duration_us = main_sample_buffer->GetTotalConsumedDurationUs() + main_segment_duration_us; if (force_create == false) { @@ -233,14 +240,16 @@ namespace mpegts continue; } - auto total_segment_duration_ms = sample_buffer->GetTotalConsumedDurationMs(); - auto target_duration_ms = total_main_segment_duration_ms - total_segment_duration_ms; - - // if video segment is 6000, audio segment is at least 6000*0.97(=5820) - if (sample_buffer->GetCurrentDurationMs() < target_duration_ms * 0.97) + auto total_sample_segment_duration_us = sample_buffer->GetTotalConsumedDurationUs() + sample_buffer->GetCurrentDurationUs(); + + // if video segment is 6000, audio segment is at least 6000*0.97(=5820), it is normal case, wait for more samples + if (total_sample_segment_duration_us < total_main_segment_duration_us * 0.97) { - // Not enough samples - logtd("Not enough samples for track_id %u, current_duration_ms %u, target_duration_ms %u", sample_buffer->GetTrack()->GetId(), sample_buffer->GetCurrentDurationMs(), target_duration_ms); + // Too much difference between the main track and the track, it means that the track may have a problem. + if (total_sample_segment_duration_us * 2 < total_main_segment_duration_us) + { + logtw("Stream(%s) Track(%u) has insufficient sample duration (%llu us) for the main track duration (%llu us)", _config.stream_id_meta.CStr(), track_id, total_sample_segment_duration_us, total_main_segment_duration_us); + } return; } @@ -255,7 +264,7 @@ namespace mpegts return; } - auto segment = std::make_shared(GetNextSegmentId(), first_sample.media_packet->GetDts(), main_segment_duration_ms); + auto segment = std::make_shared(GetNextSegmentId(), first_sample.media_packet->GetDts(), main_segment_duration_us); // Add PSI packets segment->AddPacketData(_psi_packet_data); @@ -323,10 +332,14 @@ namespace mpegts continue; } - if (sample_buffer->GetTotalConsumedDurationMs() >= total_main_segment_duration_ms) + if (sample_buffer->GetTotalConsumedDurationUs() >= total_main_segment_duration_us) { - completed_tracks[track_id] = true; - continue; + // Wraparound, if total_main_segment_duration_us is wrapped around, continue to pop samples until the total_consumed_duration_us is wrapped around + if (sample_buffer->GetTotalConsumedDurationUs() - total_main_segment_duration_us < UINT64_MAX / 2) + { + completed_tracks[track_id] = true; + continue; + } } auto sample = sample_buffer->PopSample(); @@ -379,7 +392,7 @@ namespace mpegts { std::lock_guard lock(_segments_guard); _segments.emplace(segment->GetId(), segment); - _total_segments_duration_ms += segment->GetDurationMs(); + _total_segments_duration_us += segment->GetDurationUs(); } size_t Packager::GetBufferedSegmentCount() const @@ -407,7 +420,7 @@ namespace mpegts if (it != _segments.end()) { _segments.erase(it); - _total_segments_duration_ms -= segment->GetDurationMs(); + _total_segments_duration_us -= segment->GetDurationUs(); } } @@ -442,12 +455,12 @@ namespace mpegts // Remove data from segment, it has been saved in a file segment->ResetData(); segment->SetFilePath(file_path); - _total_file_stored_segments_duration_ms += segment->GetDurationMs(); + _total_file_stored_segments_duration_us += segment->GetDurationUs(); _file_stored_segments.emplace(segment->GetId(), segment); } // Delete old segments from stored list and file - while (GetTotalFileStoredSegmentsDurationMs() > _config.dvr_window_ms) + while (GetTotalFileStoredSegmentsDurationUs() > _config.dvr_window_ms * 1000) { auto oldest_segment = GetOldestSegmentFromFile(); @@ -465,10 +478,10 @@ namespace mpegts } } - uint64_t Packager::GetTotalFileStoredSegmentsDurationMs() const + uint64_t Packager::GetTotalFileStoredSegmentsDurationUs() const { std::shared_lock lock(_file_stored_segments_guard); - return _total_file_stored_segments_duration_ms; + return _total_file_stored_segments_duration_us; } std::shared_ptr Packager::GetOldestSegmentFromFile() const @@ -489,7 +502,7 @@ namespace mpegts auto it = _file_stored_segments.find(segment->GetId()); if (it != _file_stored_segments.end()) { - _total_file_stored_segments_duration_ms -= segment->GetDurationMs(); + _total_file_stored_segments_duration_us -= segment->GetDurationUs(); _file_stored_segments.erase(it); } } @@ -582,7 +595,7 @@ namespace mpegts return ov::String::FormatString("%s/%s/%s", _config.dvr_storage_path.CStr(), _config.stream_id_meta.CStr(), _packager_id.CStr()); } - ov::String Packager::GetSegmentFilePath(uint32_t segment_id) const + ov::String Packager::GetSegmentFilePath(uint64_t segment_id) const { return ov::String::FormatString("%s/segment_%u_hls.ts", GetDvrStoragePath().CStr(), segment_id); } diff --git a/src/projects/modules/containers/mpegts/mpegts_packager.h b/src/projects/modules/containers/mpegts/mpegts_packager.h index 1532426ab..7055151dc 100644 --- a/src/projects/modules/containers/mpegts/mpegts_packager.h +++ b/src/projects/modules/containers/mpegts/mpegts_packager.h @@ -16,15 +16,16 @@ namespace mpegts { constexpr size_t SEGMENT_BUFFER_SIZE = 2000000; + constexpr size_t MAX_SEGMENT_DURATION_US = 5 * 60 * 1000 * 1000; // 5 minutes class Segment { public: - Segment(uint32_t segment_id, uint32_t first_dts, uint32_t duration_ms) + Segment(uint64_t segment_id, int64_t first_dts, uint64_t duration_us) { _segment_id = segment_id; _first_dts = first_dts; - _duration_ms = duration_ms; + _duration_us = duration_us; } bool AddPacketData(const std::shared_ptr &data) @@ -41,12 +42,12 @@ namespace mpegts return true; } - uint32_t GetId() const + uint64_t GetId() const { return _segment_id; } - uint32_t GetNumber() const + uint64_t GetNumber() const { return _segment_id; } @@ -66,9 +67,9 @@ namespace mpegts return _first_dts; } - uint32_t GetDurationMs() const + uint64_t GetDurationUs() const { - return _duration_ms; + return _duration_us; } ov::String GetFilePath() const @@ -123,9 +124,9 @@ namespace mpegts } private: - uint32_t _segment_id = 0; + uint64_t _segment_id = 0; int64_t _first_dts = -1; - uint32_t _duration_ms = 0; + uint64_t _duration_us = 0; ov::String _url; ov::String _file_path; @@ -160,21 +161,26 @@ namespace mpegts return _track; } + uint64_t GetSampleDurationUs(const Sample &sample) const + { + return static_cast(sample.media_packet->GetDuration()) * 1000000.0 / GetTrack()->GetTimeBase().GetTimescale(); + } + bool AddSample(const Sample &sample) { _samples.push(sample); - auto duration_ms = sample.media_packet->GetDuration() * 1000 / GetTrack()->GetTimeBase().GetTimescale(); + uint64_t duration_us = GetSampleDurationUs(sample); _current_samples_count++; - _current_samples_duration_ms += duration_ms; + _current_samples_duration_us += duration_us; return true; } - uint32_t GetCurrentDurationMs() const + uint64_t GetCurrentDurationUs() const { - return _current_samples_duration_ms; + return _current_samples_duration_us; } bool HasSegmentBoundary() const @@ -192,22 +198,22 @@ namespace mpegts { SegmentBoundary boundary; boundary.sample_count = _current_samples_count; - boundary.duration_ms = _current_samples_duration_ms; + boundary.duration_us = _current_samples_duration_us; _segment_boundaries.push(boundary); _current_samples_count = 0; - _current_samples_duration_ms = 0; + _current_samples_duration_us = 0; } - uint32_t GetDurationUntilSegmentBoundaryMs() const + uint64_t GetDurationUntilSegmentBoundaryUs() const { if (HasSegmentBoundary() == false) { return 0; } - return _segment_boundaries.front().duration_ms; + return _segment_boundaries.front().duration_us; } bool IsEmpty() const @@ -225,14 +231,14 @@ namespace mpegts auto sample = _samples.front(); _samples.pop(); - auto sample_duration_ms = sample.media_packet->GetDuration() * 1000 / GetTrack()->GetTimeBase().GetTimescale(); + uint64_t sample_duration_us = GetSampleDurationUs(sample); _current_samples_count--; - _current_samples_duration_ms -= sample_duration_ms; + _current_samples_duration_us -= sample_duration_us; _total_consumed_samples_count++; - _total_consumed_samples_duration_ms += sample_duration_ms; - + _total_consumed_samples_duration_us += sample_duration_us; + return sample; } @@ -255,7 +261,7 @@ namespace mpegts //logd("DEBUG", "PopSamplesUntilSegmentBoundary : sample_count %u, duration_ms %u", boundary.sample_count, boundary.duration_ms); - for (uint32_t i = 0; i < boundary.sample_count; i++) + for (uint64_t i = 0; i < boundary.sample_count; i++) { if (_samples.empty()) { @@ -270,14 +276,14 @@ namespace mpegts } _total_consumed_samples_count += boundary.sample_count; - _total_consumed_samples_duration_ms += boundary.duration_ms; + _total_consumed_samples_duration_us += boundary.duration_us; return samples; } - uint32_t GetTotalConsumedDurationMs() const + uint64_t GetTotalConsumedDurationUs() const { - return _total_consumed_samples_duration_ms; + return _total_consumed_samples_duration_us; } private: @@ -286,17 +292,17 @@ namespace mpegts struct SegmentBoundary { - uint32_t sample_count = 0; - uint32_t duration_ms = 0; + uint64_t sample_count = 0; + uint64_t duration_us = 0; }; std::queue _segment_boundaries; - uint32_t _current_samples_count = 0; - uint32_t _current_samples_duration_ms = 0; + uint64_t _current_samples_count = 0; + uint64_t _current_samples_duration_us = 0; - uint32_t _total_consumed_samples_count = 0; - uint32_t _total_consumed_samples_duration_ms = 0; + uint64_t _total_consumed_samples_count = 0; + uint64_t _total_consumed_samples_duration_us = 0; }; class PackagerSink : public ov::EnableSharedFromThis @@ -347,13 +353,13 @@ namespace mpegts void Flush(); // Get the segment data - std::shared_ptr GetSegment(uint32_t segment_id) const; - std::shared_ptr GetSegmentData(uint32_t segment_id) const; + std::shared_ptr GetSegment(uint64_t segment_id) const; + std::shared_ptr GetSegmentData(uint64_t segment_id) const; private: const Config &GetConfig() const; - uint32_t GetNextSegmentId(); + uint64_t GetNextSegmentId(); std::shared_ptr MergeTsPacketData(const std::vector> &ts_packets); @@ -374,7 +380,7 @@ namespace mpegts void SaveSegmentToFile(const std::shared_ptr &segment); void DeleteSegmentFile(const std::shared_ptr &segment); void DeleteSegmentFromFileStoredList(const std::shared_ptr &segment); - uint64_t GetTotalFileStoredSegmentsDurationMs() const; + uint64_t GetTotalFileStoredSegmentsDurationUs() const; std::shared_ptr GetOldestSegmentFromFile() const; // Retention @@ -388,7 +394,7 @@ namespace mpegts void BroadcastSegmentDeleted(const std::shared_ptr &segment); ov::String GetDvrStoragePath() const; - ov::String GetSegmentFilePath(uint32_t segment_id) const; + ov::String GetSegmentFilePath(uint64_t segment_id) const; ov::String _packager_id; Config _config; @@ -401,17 +407,17 @@ namespace mpegts std::vector> _psi_packets; std::shared_ptr _psi_packet_data; - uint32_t _last_segment_id = 0; + uint64_t _last_segment_id = 0; - std::map> _segments; - uint64_t _total_segments_duration_ms = 0; + std::map> _segments; + uint64_t _total_segments_duration_us = 0; mutable std::shared_mutex _segments_guard; - std::map> _file_stored_segments; - uint64_t _total_file_stored_segments_duration_ms = 0; + std::map> _file_stored_segments; + uint64_t _total_file_stored_segments_duration_us = 0; mutable std::shared_mutex _file_stored_segments_guard; - std::map> _retained_segments; + std::map> _retained_segments; mutable std::shared_mutex _retained_segments_guard; std::vector> _sinks; diff --git a/src/projects/publishers/hls/hls_media_playlist.cpp b/src/projects/publishers/hls/hls_media_playlist.cpp index be046f064..19aa9331e 100644 --- a/src/projects/publishers/hls/hls_media_playlist.cpp +++ b/src/projects/publishers/hls/hls_media_playlist.cpp @@ -41,7 +41,7 @@ bool HlsMediaPlaylist::OnSegmentCreated(const std::shared_ptr & { std::lock_guard lock(_segments_mutex); - logtd("HlsMediaPlaylist::OnSegmentCreated - number(%d) url(%s) duration(%d)\n", segment->GetNumber(), segment->GetUrl().CStr(), segment->GetDurationMs()); + logtd("HlsMediaPlaylist::OnSegmentCreated - number(%d) url(%s) duration_us(%llu)\n", segment->GetNumber(), segment->GetUrl().CStr(), segment->GetDurationUs()); _segments.emplace(segment->GetNumber(), segment); @@ -52,7 +52,7 @@ bool HlsMediaPlaylist::OnSegmentDeleted(const std::shared_ptr & { std::lock_guard lock(_segments_mutex); - logtd("HlsMediaPlaylist::OnSegmentDeleted - number(%d) url(%s) duration(%d)\n", segment->GetNumber(), segment->GetUrl().CStr(), segment->GetDurationMs()); + logtd("HlsMediaPlaylist::OnSegmentDeleted - number(%d) url(%s) duration_us(%llu)\n", segment->GetNumber(), segment->GetUrl().CStr(), segment->GetDurationUs()); auto it = _segments.find(segment->GetNumber()); if (it == _segments.end()) @@ -89,7 +89,7 @@ ov::String HlsMediaPlaylist::ToString(bool rewind) const { size_t segment_size = _segments.size(); size_t shift_count = segment_size > _config.segment_count ? _config.segment_count : segment_size - 1; - uint32_t last_segment_number = _segments.rbegin()->second->GetNumber(); + uint64_t last_segment_number = _segments.rbegin()->second->GetNumber(); auto it = _segments.find(last_segment_number - shift_count); if (it == _segments.end()) @@ -106,7 +106,7 @@ ov::String HlsMediaPlaylist::ToString(bool rewind) const for (auto it = _segments.find(first_segment->GetNumber()); it != _segments.end(); it ++) { const auto &segment = it->second; - result += ov::String::FormatString("#EXTINF:%.3f,\n", static_cast(segment->GetDurationMs()) / 1000.0); + result += ov::String::FormatString("#EXTINF:%.3f,\n", static_cast(segment->GetDurationUs()) / 1000000.0); result += ov::String::FormatString("%s\n", segment->GetUrl().CStr()); } diff --git a/src/projects/publishers/hls/hls_media_playlist.h b/src/projects/publishers/hls/hls_media_playlist.h index aa2a8d376..8fef316a2 100644 --- a/src/projects/publishers/hls/hls_media_playlist.h +++ b/src/projects/publishers/hls/hls_media_playlist.h @@ -59,7 +59,7 @@ class HlsMediaPlaylist std::shared_ptr _first_audio_track = nullptr; // Segment number : Segment - std::map> _segments; + std::map> _segments; mutable std::shared_mutex _segments_mutex; bool _end_list = false;