Skip to content

Commit

Permalink
Allow specifing demuxer output pad codec
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Jun 24, 2024
1 parent 72d38a0 commit 3445712
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 28 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The package can be installed by adding `membrane_mp4_plugin` to your list of dep
```elixir
defp deps do
[
{:membrane_mp4_plugin, "~> 0.35.0"}
{:membrane_mp4_plugin, "~> 0.35.1"}
]
end
```
Expand Down
4 changes: 2 additions & 2 deletions examples/demuxer_isom.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Example do
hackney_opts: [follow_redirect: true]
})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(Pad.ref(:output, 1), options: [codec: Membrane.H264])
|> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:sink_video, %Membrane.File.Sink{location: @output_video}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(Pad.ref(:output, 2), options: [codec: Membrane.AAC])
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
Expand Down
150 changes: 133 additions & 17 deletions lib/membrane_mp4/demuxer/isom.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,19 @@ defmodule Membrane.MP4.Demuxer.ISOM do
%Membrane.Opus{self_delimiting?: false}
),
availability: :on_request,
flow_control: :auto
options: [
codec: [
spec: Membrane.H264 | Membrane.H265 | Membrane.Opus | Membrane.AAC | nil,
default: nil,
description: """
Specifies, what kind of codec can be handled by a pad.
Pad with `:codec` option set to `:all` can handle all codecs.
Defaults to `:all`
"""
]
]

def_options optimize_for_non_fast_start?: [
default: false,
Expand Down Expand Up @@ -82,7 +94,9 @@ defmodule Membrane.MP4.Demuxer.ISOM do
boxes_size: 0,
mdat_beginning: nil,
mdat_size: nil,
mdat_header_size: nil
mdat_header_size: nil,
track_to_pad_id: %{},
track_notifications_sent?: false
}

{[], state}
Expand Down Expand Up @@ -147,7 +161,7 @@ defmodule Membrane.MP4.Demuxer.ISOM do
state.partial <> buffer.payload
)

buffers = get_buffer_actions(samples)
buffers = get_buffer_actions(samples, state)

{buffers, %{state | samples_info: samples_info, partial: rest}}
end
Expand Down Expand Up @@ -356,22 +370,26 @@ defmodule Membrane.MP4.Demuxer.ISOM do

state = %{state | samples_info: samples_info, partial: rest}

state = match_tracks_with_pads(ctx, state)

all_pads_connected? = all_pads_connected?(ctx, state)

{buffers, state} =
if all_pads_connected? do
{get_buffer_actions(samples), state}
{get_buffer_actions(samples, state), state}
else
{[], store_samples(state, samples)}
end

notifications = get_track_notifications(state)

stream_format = if all_pads_connected?, do: get_stream_format(state), else: []

state =
%{
state
| all_pads_connected?: all_pads_connected?
| all_pads_connected?: all_pads_connected?,
track_notifications_sent?: true
}
|> update_fsm_state()

Expand All @@ -385,9 +403,10 @@ defmodule Membrane.MP4.Demuxer.ISOM do
end)
end

defp get_buffer_actions(samples) do
defp get_buffer_actions(samples, state) do
Enum.map(samples, fn {buffer, track_id} ->
{:buffer, {Pad.ref(:output, track_id), buffer}}
pad_id = state.track_to_pad_id[track_id]
{:buffer, {Pad.ref(:output, pad_id), buffer}}
end)
end

Expand All @@ -398,12 +417,59 @@ defmodule Membrane.MP4.Demuxer.ISOM do
end
end

defp match_tracks_with_pads(ctx, state) do
codec_to_pads_data =
ctx.pads
|> Map.values()
|> Enum.reject(fn %{ref: pad_ref} -> pad_ref == :input end)
|> Enum.group_by(& &1.options.codec)

{track_to_pad_id, empty_codec_to_pads_data} =
state.samples_info.sample_tables
|> Enum.map_reduce(codec_to_pads_data, fn {track_id, table}, codec_to_pads_data ->
%track_codec{} = table.sample_description

case codec_to_pads_data[track_codec] do
[pad_data | tail] ->
codec_to_pads_data = Map.put(codec_to_pads_data, track_codec, tail)
Pad.ref(:output, pad_id) = pad_data.ref
{{track_id, pad_id}, codec_to_pads_data}

_no_pad ->
{{track_id, nil}, codec_to_pads_data}
end
end)

max_pad_id =
track_to_pad_id
|> Enum.flat_map(fn {_track, pad_id} -> if pad_id != nil, do: [pad_id], else: [] end)
|> Enum.max(&>=/2, fn -> 0 end)

{track_to_pad_id, _max_pad_id} =
track_to_pad_id
|> Enum.map_reduce(max_pad_id, fn
{track_id, nil}, max_pad_id -> {{track_id, max_pad_id + 1}, max_pad_id + 1}
track_pad_pair, max_pad_id -> {track_pad_pair, max_pad_id}
end)

raise? =
empty_codec_to_pads_data
|> Map.values()
|> Enum.any?(&(&1 != []))

if raise? do
"dupaaaa"
end

%{state | track_to_pad_id: Map.new(track_to_pad_id)}
end

defp get_track_notifications(state) do
new_tracks =
state.samples_info.sample_tables
|> Enum.map(fn {track_id, table} ->
content = table.sample_description
{track_id, content}
pad_id = state.track_to_pad_id[track_id]
{pad_id, table.sample_description}
end)

[{:notify_parent, {:new_tracks, new_tracks}}]
Expand All @@ -412,7 +478,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
defp get_stream_format(state) do
state.samples_info.sample_tables
|> Enum.map(fn {track_id, table} ->
{:stream_format, {Pad.ref(:output, track_id), table.sample_description}}
pad_id = state.track_to_pad_id[track_id]
{:stream_format, {Pad.ref(:output, pad_id), table.sample_description}}
end)
end

Expand All @@ -425,7 +492,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
raise "All tracks have corresponding pad already connected"
end

def handle_pad_added(Pad.ref(:output, _track_id), ctx, state) do
def handle_pad_added(Pad.ref(:output, _track_id) = pad_ref, ctx, state) do
:ok = validate_pad_codec!(pad_ref, ctx.pad_options.codec, ctx, state)
all_pads_connected? = all_pads_connected?(ctx, state)

{actions, state} =
Expand All @@ -444,6 +512,53 @@ defmodule Membrane.MP4.Demuxer.ISOM do
{actions, state}
end

defp validate_pad_codec!(pad_ref, pad_codec, ctx, state) do
allowed_codecs = [nil, Membrane.H264, Membrane.H265, Membrane.Opus, Membrane.AAC]

if pad_codec not in allowed_codecs do
raise """
Pad #{inspect(pad_ref)} has :codec option set to #{inspect(pad_codec)}, while it has te be one of \
#{List.delete(allowed_codecs, nil) |> inspect()} or be unspecified.
"""
end

if not state.track_notifications_sent? and
Enum.count(ctx.pads, &match?({Pad.ref(:output, _id), %{options: %{codec: nil}}}, &1)) > 1 do
raise """
If pads are linked before :new_tracks notifications and there are more then one of them, pad option \
:codec has to be specyfied.
"""
end

if state.track_notifications_sent? do
Pad.ref(:output, pad_id) = pad_ref

related_track =
state.track_to_pad_id
|> Map.keys()
|> Enum.find(&(state.track_to_pad_id[&1] == pad_id))

if related_track == nil do
raise """
Pad #{inspect(pad_ref)} doesn't have a related track. If you link pads after #{inspect(__MODULE__)} \
sent the track notification, you have to restrict yourself to the pad occuring in this notification. \
Tracks, that occured in this notification are: #{Map.keys(state.track_to_pad_id) |> inspect()}
"""
end

track_codec = state.samples_info.sample_tables[related_track].sample_description

if pad_codec != nil and track_codec != pad_codec do
raise """
Pad option :codec must point on the related track codec or be equal nil, but pad #{inspect(pad_ref)} \
codec is #{inspect(pad_codec)}, while the related track codec is #{inspect(track_codec)}
"""
end
end

:ok
end

@impl true
def handle_end_of_stream(:input, _ctx, %{all_pads_connected?: false} = state) do
{[], %{state | end_of_stream?: true}}
Expand All @@ -465,11 +580,11 @@ defmodule Membrane.MP4.Demuxer.ISOM do
_pad -> []
end)

Enum.each(pads, fn pad ->
if pad not in tracks do
raise "An output pad connected with #{pad} id, however no matching track exists"
end
end)
# Enum.each(pads, fn pad ->
# if pad not in tracks do
# raise "An output pad connected with #{pad} id, however no matching track exists"
# end
# end)

Range.size(tracks) == length(pads)
end
Expand All @@ -482,7 +597,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
|> Enum.reverse()
|> Enum.map(fn {buffer, ^track_id} -> buffer end)

{:buffer, {Pad.ref(:output, track_id), buffers}}
pad_id = state.track_to_pad_id[track_id]
{:buffer, {Pad.ref(:output, pad_id), buffers}}
end)

state = %{state | buffered_samples: %{}}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.MP4.Plugin.MixProject do
use Mix.Project

@version "0.35.0"
@version "0.35.1"
@github_url "https://github.com/membraneframework/membrane_mp4_plugin"

def project do
Expand Down
24 changes: 19 additions & 5 deletions test/membrane_mp4/demuxer/isom/demuxer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -284,28 +284,42 @@ defmodule Membrane.MP4.Demuxer.ISOM.DemuxerTest do
end

defp start_testing_pipeline!(opts) do
structure = [
spec =
child(:file, %Membrane.File.Source{location: opts[:input_file]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)

pipeline = Pipeline.start_link_supervised!(spec: spec)
assert_pipeline_notified(pipeline, :demuxer, {:new_tracks, _notification})

spec =
get_child(:demuxer)
|> via_out(Pad.ref(:output, 1))
|> child(:sink, %Membrane.File.Sink{location: opts[:output_file]})
]

Pipeline.start_link_supervised!(spec: structure)
Pipeline.execute_actions(pipeline, spec: spec)
pipeline
end

defp start_testing_pipeline_with_two_tracks!(opts) do
structure = [
spec =
child(:file, %Membrane.File.Source{location: opts[:input_file]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)

pipeline = Pipeline.start_link_supervised!(spec: spec)

assert_pipeline_notified(pipeline, :demuxer, {:new_tracks, _notification})

spec = [
get_child(:demuxer)
|> via_out(Pad.ref(:output, 1))
|> child(:video_sink, %Membrane.File.Sink{location: opts[:video_output_file]}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> child(:audio_sink, %Membrane.File.Sink{location: opts[:audio_output_file]})
]

Pipeline.start_link_supervised!(spec: structure)
Pipeline.execute_actions(pipeline, spec: spec)
pipeline
end

defp start_remote_pipeline!(opts) do
Expand Down
4 changes: 2 additions & 2 deletions test/membrane_mp4/demuxer/isom/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ defmodule Membrane.MP4.Demuxer.ISOM.IntegrationTest do
demuxing_spec = [
child(:file, %Membrane.File.Source{location: mp4_path})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(Pad.ref(:output, 1), options: [codec: Membrane.H264])
|> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:sink_video, %Membrane.File.Sink{location: out_video_path}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(Pad.ref(:output, 2), options: [codec: Membrane.AAC])
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
Expand Down

0 comments on commit 3445712

Please sign in to comment.