Skip to content

Commit

Permalink
Fixed memory leak posibility in HLSv3
Browse files Browse the repository at this point in the history
  • Loading branch information
getroot committed Sep 19, 2024
1 parent 9d011aa commit ea24f69
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 74 deletions.
69 changes: 41 additions & 28 deletions src/projects/modules/containers/mpegts/mpegts_packager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ namespace mpegts
return true;
}

uint32_t Packager::GetNextSegmentId()
uint64_t Packager::GetNextSegmentId()
{
return _last_segment_id ++;
}
Expand Down Expand Up @@ -132,11 +132,11 @@ namespace mpegts
}

if (track_id == _main_track_id &&
sample_buffer->GetCurrentDurationMs() >= _config.target_duration_ms)
sample_buffer->GetCurrentDurationUs() >= _config.target_duration_ms * 1000)
{
if (media_packet->GetMediaType() == cmn::MediaType::Video && media_packet->IsKeyFrame())
{
sample_buffer->MarkSegmentBoundary();
sample_buffer->MarkSegmentBoundary();
}
else if (media_packet->GetMediaType() == cmn::MediaType::Audio)
{
Expand All @@ -162,7 +162,7 @@ namespace mpegts
CreateSegmentIfReady(true);
}

std::shared_ptr<Segment> Packager::GetSegment(uint32_t segment_id) const
std::shared_ptr<Segment> Packager::GetSegment(uint64_t segment_id) const
{
{
std::shared_lock<std::shared_mutex> lock(_segments_guard);
Expand Down Expand Up @@ -194,7 +194,7 @@ namespace mpegts
return nullptr;
}

std::shared_ptr<const ov::Data> Packager::GetSegmentData(uint32_t segment_id) const
std::shared_ptr<const ov::Data> Packager::GetSegmentData(uint64_t segment_id) const
{
auto segment = GetSegment(segment_id);
if (segment == nullptr)
Expand All @@ -211,17 +211,24 @@ namespace mpegts
auto main_sample_buffer = GetSampleBuffer(_main_track_id);
if (main_sample_buffer == nullptr)
{
logtc("SampleBuffer is not found for main track_id %u", _main_track_id);
logtc("Stream(%s) SampleBuffer is not found for main track_id %u", _config.stream_id_meta.CStr(), _main_track_id);
return;
}

if (main_sample_buffer->HasSegmentBoundary() == false)
{
return;
if (main_sample_buffer->GetCurrentDurationUs() >= MAX_SEGMENT_DURATION_US)
{
logtw("Stream (%s) Main track (%u) does not have a segment boundary. However, the queued segment duration is too long: %u ms. A new segment will be forcibly created.", _config.stream_id_meta.CStr(), _main_track_id, main_sample_buffer->GetCurrentDurationUs());

Flush();
}

return;
}

auto main_segment_duration_ms = main_sample_buffer->GetDurationUntilSegmentBoundaryMs();
auto total_main_segment_duration_ms = main_sample_buffer->GetTotalConsumedDurationMs() + main_segment_duration_ms;
auto main_segment_duration_us = main_sample_buffer->GetDurationUntilSegmentBoundaryUs();
uint64_t total_main_segment_duration_us = main_sample_buffer->GetTotalConsumedDurationUs() + main_segment_duration_us;

if (force_create == false)
{
Expand All @@ -233,14 +240,16 @@ namespace mpegts
continue;
}

auto total_segment_duration_ms = sample_buffer->GetTotalConsumedDurationMs();
auto target_duration_ms = total_main_segment_duration_ms - total_segment_duration_ms;

// if video segment is 6000, audio segment is at least 6000*0.97(=5820)
if (sample_buffer->GetCurrentDurationMs() < target_duration_ms * 0.97)
auto total_sample_segment_duration_us = sample_buffer->GetTotalConsumedDurationUs() + sample_buffer->GetCurrentDurationUs();

// if video segment is 6000, audio segment is at least 6000*0.97(=5820), it is normal case, wait for more samples
if (total_sample_segment_duration_us < total_main_segment_duration_us * 0.97)
{
// Not enough samples
logtd("Not enough samples for track_id %u, current_duration_ms %u, target_duration_ms %u", sample_buffer->GetTrack()->GetId(), sample_buffer->GetCurrentDurationMs(), target_duration_ms);
// Too much difference between the main track and the track, it means that the track may have a problem.
if (total_sample_segment_duration_us * 2 < total_main_segment_duration_us)
{
logtw("Stream(%s) Track(%u) has insufficient sample duration (%llu us) for the main track duration (%llu us)", _config.stream_id_meta.CStr(), track_id, total_sample_segment_duration_us, total_main_segment_duration_us);
}

return;
}
Expand All @@ -255,7 +264,7 @@ namespace mpegts
return;
}

auto segment = std::make_shared<Segment>(GetNextSegmentId(), first_sample.media_packet->GetDts(), main_segment_duration_ms);
auto segment = std::make_shared<Segment>(GetNextSegmentId(), first_sample.media_packet->GetDts(), main_segment_duration_us);

// Add PSI packets
segment->AddPacketData(_psi_packet_data);
Expand Down Expand Up @@ -323,10 +332,14 @@ namespace mpegts
continue;
}

if (sample_buffer->GetTotalConsumedDurationMs() >= total_main_segment_duration_ms)
if (sample_buffer->GetTotalConsumedDurationUs() >= total_main_segment_duration_us)
{
completed_tracks[track_id] = true;
continue;
// Wraparound, if total_main_segment_duration_us is wrapped around, continue to pop samples until the total_consumed_duration_us is wrapped around
if (sample_buffer->GetTotalConsumedDurationUs() - total_main_segment_duration_us < UINT64_MAX / 2)
{
completed_tracks[track_id] = true;
continue;
}
}

auto sample = sample_buffer->PopSample();
Expand Down Expand Up @@ -379,7 +392,7 @@ namespace mpegts
{
std::lock_guard<std::shared_mutex> lock(_segments_guard);
_segments.emplace(segment->GetId(), segment);
_total_segments_duration_ms += segment->GetDurationMs();
_total_segments_duration_us += segment->GetDurationUs();
}

size_t Packager::GetBufferedSegmentCount() const
Expand Down Expand Up @@ -407,7 +420,7 @@ namespace mpegts
if (it != _segments.end())
{
_segments.erase(it);
_total_segments_duration_ms -= segment->GetDurationMs();
_total_segments_duration_us -= segment->GetDurationUs();
}
}

Expand Down Expand Up @@ -442,12 +455,12 @@ namespace mpegts
// Remove data from segment, it has been saved in a file
segment->ResetData();
segment->SetFilePath(file_path);
_total_file_stored_segments_duration_ms += segment->GetDurationMs();
_total_file_stored_segments_duration_us += segment->GetDurationUs();
_file_stored_segments.emplace(segment->GetId(), segment);
}

// Delete old segments from stored list and file
while (GetTotalFileStoredSegmentsDurationMs() > _config.dvr_window_ms)
while (GetTotalFileStoredSegmentsDurationUs() > _config.dvr_window_ms * 1000)
{
auto oldest_segment = GetOldestSegmentFromFile();

Expand All @@ -465,10 +478,10 @@ namespace mpegts
}
}

uint64_t Packager::GetTotalFileStoredSegmentsDurationMs() const
uint64_t Packager::GetTotalFileStoredSegmentsDurationUs() const
{
std::shared_lock<std::shared_mutex> lock(_file_stored_segments_guard);
return _total_file_stored_segments_duration_ms;
return _total_file_stored_segments_duration_us;
}

std::shared_ptr<Segment> Packager::GetOldestSegmentFromFile() const
Expand All @@ -489,7 +502,7 @@ namespace mpegts
auto it = _file_stored_segments.find(segment->GetId());
if (it != _file_stored_segments.end())
{
_total_file_stored_segments_duration_ms -= segment->GetDurationMs();
_total_file_stored_segments_duration_us -= segment->GetDurationUs();
_file_stored_segments.erase(it);
}
}
Expand Down Expand Up @@ -582,7 +595,7 @@ namespace mpegts
return ov::String::FormatString("%s/%s/%s", _config.dvr_storage_path.CStr(), _config.stream_id_meta.CStr(), _packager_id.CStr());
}

ov::String Packager::GetSegmentFilePath(uint32_t segment_id) const
ov::String Packager::GetSegmentFilePath(uint64_t segment_id) const
{
return ov::String::FormatString("%s/segment_%u_hls.ts", GetDvrStoragePath().CStr(), segment_id);
}
Expand Down
Loading

0 comments on commit ea24f69

Please sign in to comment.