Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify pad codec #114

Merged
merged 11 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The package can be installed by adding `membrane_mp4_plugin` to your list of dep
```elixir
defp deps do
[
{:membrane_mp4_plugin, "~> 0.35.0"}
{:membrane_mp4_plugin, "~> 0.35.1"}
]
end
```
Expand Down
4 changes: 2 additions & 2 deletions examples/demuxer_isom.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Example do
hackney_opts: [follow_redirect: true]
})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(Pad.ref(:output, 1), options: [codec: Membrane.H264])
|> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:sink_video, %Membrane.File.Sink{location: @output_video}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(Pad.ref(:output, 2), options: [codec: Membrane.AAC])
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
Expand Down
142 changes: 124 additions & 18 deletions lib/membrane_mp4/demuxer/isom.ex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the moduledoc

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ defmodule Membrane.MP4.Demuxer.ISOM do
%Membrane.Opus{self_delimiting?: false}
),
availability: :on_request,
flow_control: :auto
options: [
codec: [
spec: Membrane.H264 | Membrane.H265 | Membrane.Opus | Membrane.AAC | nil,
default: nil,
description: """
Specifies, what kind of codec can be handled by a pad.
"""
]
]

def_options optimize_for_non_fast_start?: [
default: false,
Expand Down Expand Up @@ -82,7 +90,9 @@ defmodule Membrane.MP4.Demuxer.ISOM do
boxes_size: 0,
mdat_beginning: nil,
mdat_size: nil,
mdat_header_size: nil
mdat_header_size: nil,
track_to_pad_id: %{},
track_notifications_sent?: false
}

{[], state}
Expand Down Expand Up @@ -147,7 +157,7 @@ defmodule Membrane.MP4.Demuxer.ISOM do
state.partial <> buffer.payload
)

buffers = get_buffer_actions(samples)
buffers = get_buffer_actions(samples, state)

{buffers, %{state | samples_info: samples_info, partial: rest}}
end
Expand Down Expand Up @@ -356,22 +366,26 @@ defmodule Membrane.MP4.Demuxer.ISOM do

state = %{state | samples_info: samples_info, partial: rest}

state = match_tracks_with_pads(ctx, state)

all_pads_connected? = all_pads_connected?(ctx, state)

{buffers, state} =
if all_pads_connected? do
{get_buffer_actions(samples), state}
{get_buffer_actions(samples, state), state}
else
{[], store_samples(state, samples)}
end

notifications = get_track_notifications(state)

stream_format = if all_pads_connected?, do: get_stream_format(state), else: []

state =
%{
state
| all_pads_connected?: all_pads_connected?
| all_pads_connected?: all_pads_connected?,
track_notifications_sent?: true
}
|> update_fsm_state()

Expand All @@ -385,9 +399,10 @@ defmodule Membrane.MP4.Demuxer.ISOM do
end)
end

defp get_buffer_actions(samples) do
defp get_buffer_actions(samples, state) do
Enum.map(samples, fn {buffer, track_id} ->
{:buffer, {Pad.ref(:output, track_id), buffer}}
pad_id = state.track_to_pad_id[track_id]
{:buffer, {Pad.ref(:output, pad_id), buffer}}
end)
end

Expand All @@ -398,12 +413,59 @@ defmodule Membrane.MP4.Demuxer.ISOM do
end
end

defp match_tracks_with_pads(ctx, state) do
codec_to_pads_data =
ctx.pads
|> Map.values()
|> Enum.reject(fn %{ref: pad_ref} -> pad_ref == :input end)
FelonEkonom marked this conversation as resolved.
Show resolved Hide resolved
|> Enum.group_by(& &1.options.codec)

{track_to_pad_id, empty_codec_to_pads_data} =
state.samples_info.sample_tables
|> Enum.map_reduce(codec_to_pads_data, fn {track_id, table}, codec_to_pads_data ->
%track_codec{} = table.sample_description

case codec_to_pads_data[track_codec] do
[pad_data | tail] ->
codec_to_pads_data = Map.put(codec_to_pads_data, track_codec, tail)
Pad.ref(:output, pad_id) = pad_data.ref
{{track_id, pad_id}, codec_to_pads_data}

_no_pad ->
{{track_id, nil}, codec_to_pads_data}
end
end)

max_pad_id =
track_to_pad_id
|> Enum.flat_map(fn {_track, pad_id} -> if pad_id != nil, do: [pad_id], else: [] end)
|> Enum.max(&>=/2, fn -> 0 end)

{track_to_pad_id, _max_pad_id} =
track_to_pad_id
|> Enum.map_reduce(max_pad_id, fn
{track_id, nil}, max_pad_id -> {{track_id, max_pad_id + 1}, max_pad_id + 1}
track_pad_pair, max_pad_id -> {track_pad_pair, max_pad_id}
end)

raise? =
empty_codec_to_pads_data
|> Map.values()
|> Enum.any?(&(&1 != []))

if raise? do
"dupaaaa"
end

%{state | track_to_pad_id: Map.new(track_to_pad_id)}
end

defp get_track_notifications(state) do
new_tracks =
state.samples_info.sample_tables
|> Enum.map(fn {track_id, table} ->
content = table.sample_description
{track_id, content}
pad_id = state.track_to_pad_id[track_id]
{pad_id, table.sample_description}
end)

[{:notify_parent, {:new_tracks, new_tracks}}]
Expand All @@ -412,7 +474,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
defp get_stream_format(state) do
state.samples_info.sample_tables
|> Enum.map(fn {track_id, table} ->
{:stream_format, {Pad.ref(:output, track_id), table.sample_description}}
pad_id = state.track_to_pad_id[track_id]
{:stream_format, {Pad.ref(:output, pad_id), table.sample_description}}
end)
end

Expand All @@ -425,7 +488,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
raise "All tracks have corresponding pad already connected"
end

def handle_pad_added(Pad.ref(:output, _track_id), ctx, state) do
def handle_pad_added(Pad.ref(:output, _track_id) = pad_ref, ctx, state) do
:ok = validate_pad_codec!(pad_ref, ctx.pad_options.codec, ctx, state)
all_pads_connected? = all_pads_connected?(ctx, state)

{actions, state} =
Expand All @@ -444,6 +508,53 @@ defmodule Membrane.MP4.Demuxer.ISOM do
{actions, state}
end

defp validate_pad_codec!(pad_ref, pad_codec, ctx, state) do
allowed_codecs = [nil, Membrane.H264, Membrane.H265, Membrane.Opus, Membrane.AAC]

if pad_codec not in allowed_codecs do
raise """
Pad #{inspect(pad_ref)} has :codec option set to #{inspect(pad_codec)}, while it has te be one of \
#{List.delete(allowed_codecs, nil) |> inspect()} or be unspecified.
"""
end

if not state.track_notifications_sent? and
Enum.count(ctx.pads, &match?({Pad.ref(:output, _id), %{options: %{codec: nil}}}, &1)) > 1 do
raise """
If pads are linked before :new_tracks notifications and there are more then one of them, pad option \
:codec has to be specyfied.
"""
end

if state.track_notifications_sent? do
Pad.ref(:output, pad_id) = pad_ref

related_track =
state.track_to_pad_id
|> Map.keys()
|> Enum.find(&(state.track_to_pad_id[&1] == pad_id))

if related_track == nil do
raise """
Pad #{inspect(pad_ref)} doesn't have a related track. If you link pads after #{inspect(__MODULE__)} \
sent the track notification, you have to restrict yourself to the pad occuring in this notification. \
Tracks, that occured in this notification are: #{Map.keys(state.track_to_pad_id) |> inspect()}
"""
end

track_codec = state.samples_info.sample_tables[related_track].sample_description

if pad_codec != nil and track_codec != pad_codec do
raise """
Pad option :codec must point on the related track codec or be equal nil, but pad #{inspect(pad_ref)} \
codec is #{inspect(pad_codec)}, while the related track codec is #{inspect(track_codec)}
"""
end
end

:ok
end

@impl true
def handle_end_of_stream(:input, _ctx, %{all_pads_connected?: false} = state) do
{[], %{state | end_of_stream?: true}}
Expand All @@ -465,12 +576,6 @@ defmodule Membrane.MP4.Demuxer.ISOM do
_pad -> []
end)

Enum.each(pads, fn pad ->
if pad not in tracks do
raise "An output pad connected with #{pad} id, however no matching track exists"
end
end)

Range.size(tracks) == length(pads)
end

Expand All @@ -482,7 +587,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
|> Enum.reverse()
|> Enum.map(fn {buffer, ^track_id} -> buffer end)

{:buffer, {Pad.ref(:output, track_id), buffers}}
pad_id = state.track_to_pad_id[track_id]
{:buffer, {Pad.ref(:output, pad_id), buffers}}
end)

state = %{state | buffered_samples: %{}}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.MP4.Plugin.MixProject do
use Mix.Project

@version "0.35.0"
@version "0.35.1"
@github_url "https://github.com/membraneframework/membrane_mp4_plugin"

def project do
Expand Down
24 changes: 19 additions & 5 deletions test/membrane_mp4/demuxer/isom/demuxer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -284,28 +284,42 @@ defmodule Membrane.MP4.Demuxer.ISOM.DemuxerTest do
end

defp start_testing_pipeline!(opts) do
structure = [
spec =
child(:file, %Membrane.File.Source{location: opts[:input_file]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)

pipeline = Pipeline.start_link_supervised!(spec: spec)
assert_pipeline_notified(pipeline, :demuxer, {:new_tracks, _notification})

spec =
get_child(:demuxer)
|> via_out(Pad.ref(:output, 1))
|> child(:sink, %Membrane.File.Sink{location: opts[:output_file]})
]

Pipeline.start_link_supervised!(spec: structure)
Pipeline.execute_actions(pipeline, spec: spec)
pipeline
end

defp start_testing_pipeline_with_two_tracks!(opts) do
structure = [
spec =
child(:file, %Membrane.File.Source{location: opts[:input_file]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)

pipeline = Pipeline.start_link_supervised!(spec: spec)

assert_pipeline_notified(pipeline, :demuxer, {:new_tracks, _notification})

spec = [
get_child(:demuxer)
|> via_out(Pad.ref(:output, 1))
|> child(:video_sink, %Membrane.File.Sink{location: opts[:video_output_file]}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> child(:audio_sink, %Membrane.File.Sink{location: opts[:audio_output_file]})
]

Pipeline.start_link_supervised!(spec: structure)
Pipeline.execute_actions(pipeline, spec: spec)
pipeline
end

defp start_remote_pipeline!(opts) do
Expand Down
4 changes: 2 additions & 2 deletions test/membrane_mp4/demuxer/isom/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ defmodule Membrane.MP4.Demuxer.ISOM.IntegrationTest do
demuxing_spec = [
child(:file, %Membrane.File.Source{location: mp4_path})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(Pad.ref(:output, 1), options: [codec: Membrane.H264])
|> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:sink_video, %Membrane.File.Sink{location: out_video_path}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(Pad.ref(:output, 2), options: [codec: Membrane.AAC])
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
Expand Down