Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify pad codec #114

Merged
merged 11 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ The package can be installed by adding `membrane_mp4_plugin` to your list of dep
```elixir
defp deps do
[
{:membrane_mp4_plugin, "~> 0.35.0"}
{:membrane_mp4_plugin, "~> 0.35.1"}
]
end
```
Expand Down
4 changes: 2 additions & 2 deletions examples/demuxer_isom.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ defmodule Example do
hackney_opts: [follow_redirect: true]
})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(Pad.ref(:output, 1), options: [kind: :video])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|> via_out(Pad.ref(:output, 1), options: [kind: :video])
|> via_out(:output, options: [kind: :video])

|> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:sink_video, %Membrane.File.Sink{location: @output_video}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(Pad.ref(:output, 2), options: [kind: :audio])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|> via_out(Pad.ref(:output, 2), options: [kind: :audio])
|> via_out(:output, options: [kind: :audio])

|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
Expand Down
171 changes: 153 additions & 18 deletions lib/membrane_mp4/demuxer/isom.ex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the moduledoc

Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,15 @@ defmodule Membrane.MP4.Demuxer.ISOM do
%Membrane.Opus{self_delimiting?: false}
),
availability: :on_request,
flow_control: :auto
options: [
kind: [
spec: :video | :audio | nil,
default: nil,
description: """
Specifies, what kind of data can be handled by a pad.
"""
]
]

def_options optimize_for_non_fast_start?: [
default: false,
Expand Down Expand Up @@ -82,7 +90,9 @@ defmodule Membrane.MP4.Demuxer.ISOM do
boxes_size: 0,
mdat_beginning: nil,
mdat_size: nil,
mdat_header_size: nil
mdat_header_size: nil,
track_to_pad_id: %{},
track_notifications_sent?: false
}

{[], state}
Expand Down Expand Up @@ -147,7 +157,7 @@ defmodule Membrane.MP4.Demuxer.ISOM do
state.partial <> buffer.payload
)

buffers = get_buffer_actions(samples)
buffers = get_buffer_actions(samples, state)

{buffers, %{state | samples_info: samples_info, partial: rest}}
end
Expand Down Expand Up @@ -356,22 +366,26 @@ defmodule Membrane.MP4.Demuxer.ISOM do

state = %{state | samples_info: samples_info, partial: rest}

state = match_tracks_with_pads(ctx, state)

all_pads_connected? = all_pads_connected?(ctx, state)

{buffers, state} =
if all_pads_connected? do
{get_buffer_actions(samples), state}
{get_buffer_actions(samples, state), state}
else
{[], store_samples(state, samples)}
end

notifications = get_track_notifications(state)

stream_format = if all_pads_connected?, do: get_stream_format(state), else: []

state =
%{
state
| all_pads_connected?: all_pads_connected?
| all_pads_connected?: all_pads_connected?,
track_notifications_sent?: true
}
|> update_fsm_state()

Expand All @@ -385,9 +399,10 @@ defmodule Membrane.MP4.Demuxer.ISOM do
end)
end

defp get_buffer_actions(samples) do
defp get_buffer_actions(samples, state) do
Enum.map(samples, fn {buffer, track_id} ->
{:buffer, {Pad.ref(:output, track_id), buffer}}
pad_id = state.track_to_pad_id[track_id]
{:buffer, {Pad.ref(:output, pad_id), buffer}}
end)
end

Expand All @@ -398,12 +413,86 @@ defmodule Membrane.MP4.Demuxer.ISOM do
end
end

defp match_tracks_with_pads(ctx, state) do
kind_to_pads_data =
ctx.pads
|> Map.values()
|> Enum.reject(fn %{ref: pad_ref} -> pad_ref == :input end)
FelonEkonom marked this conversation as resolved.
Show resolved Hide resolved
|> Enum.group_by(& &1.options.kind)

{track_to_pad_id, empty_kind_to_pads_data} =
state.samples_info.sample_tables
|> Enum.map_reduce(kind_to_pads_data, fn {track_id, table}, kind_to_pads_data ->
track_kind = sample_description_to_kind(table.sample_description)

case kind_to_pads_data do
%{^track_kind => [pad_data | tail]} ->
# match track with a pad with specified kind
kind_to_pads_data = Map.put(kind_to_pads_data, track_kind, tail)
Pad.ref(:output, pad_id) = pad_data.ref
{{track_id, pad_id}, kind_to_pads_data}

%{nil => [pad_data]} ->
# match track with the only one pad with unspecified kind
kind_to_pads_data = Map.delete(kind_to_pads_data, nil)
Pad.ref(:output, pad_id) = pad_data.ref
{{track_id, pad_id}, kind_to_pads_data}

%{} ->
# corresponding pad hasn't been linked yet
{{track_id, nil}, kind_to_pads_data}
end
end)

max_pad_id =
track_to_pad_id
|> Enum.flat_map(fn {_track, pad_id} -> if pad_id != nil, do: [pad_id], else: [] end)
|> Enum.max(&>=/2, fn -> 0 end)

{track_to_pad_id, _max_pad_id} =
track_to_pad_id
|> Enum.map_reduce(max_pad_id, fn
{track_id, nil}, max_pad_id -> {{track_id, max_pad_id + 1}, max_pad_id + 1}
track_pad_pair, max_pad_id -> {track_pad_pair, max_pad_id}
end)

raise? =
empty_kind_to_pads_data
|> Map.values()
|> Enum.any?(&(&1 != []))

if raise? do
pads_kinds =
ctx.pads
|> Enum.flat_map(fn
{:input, _pad_data} -> []
{_pad_ref, %{options: %{kind: kind}}} -> [kind]
end)

tracks_codecs =
state.samples_info.sample_tables
|> Enum.map(fn {_track, table} -> table.sample_description.__struct__ end)

raise """
Pads kinds don't match with tracks codecs. Pads kinds are #{inspect(pads_kinds)}. \
Tracks codecs are #{inspect(tracks_codecs)}
"""
end

%{state | track_to_pad_id: Map.new(track_to_pad_id)}
end
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it so long and complex? Let's simplify it


defp sample_description_to_kind(%Membrane.H264{}), do: :video
defp sample_description_to_kind(%Membrane.H265{}), do: :video
defp sample_description_to_kind(%Membrane.AAC{}), do: :audio
defp sample_description_to_kind(%Membrane.Opus{}), do: :audio

defp get_track_notifications(state) do
new_tracks =
state.samples_info.sample_tables
|> Enum.map(fn {track_id, table} ->
content = table.sample_description
{track_id, content}
pad_id = state.track_to_pad_id[track_id]
{pad_id, table.sample_description}
end)

[{:notify_parent, {:new_tracks, new_tracks}}]
Expand All @@ -412,7 +501,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
defp get_stream_format(state) do
state.samples_info.sample_tables
|> Enum.map(fn {track_id, table} ->
{:stream_format, {Pad.ref(:output, track_id), table.sample_description}}
pad_id = state.track_to_pad_id[track_id]
{:stream_format, {Pad.ref(:output, pad_id), table.sample_description}}
end)
end

Expand All @@ -425,7 +515,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
raise "All tracks have corresponding pad already connected"
end

def handle_pad_added(Pad.ref(:output, _track_id), ctx, state) do
def handle_pad_added(Pad.ref(:output, _track_id) = pad_ref, ctx, state) do
:ok = validate_pad_kind!(pad_ref, ctx.pad_options.kind, ctx, state)
all_pads_connected? = all_pads_connected?(ctx, state)

{actions, state} =
Expand All @@ -444,6 +535,55 @@ defmodule Membrane.MP4.Demuxer.ISOM do
{actions, state}
end

defp validate_pad_kind!(pad_ref, pad_kind, ctx, state) do
allowed_kinds = [nil, :audio, :video]

if pad_kind not in allowed_kinds do
raise """
Pad #{inspect(pad_ref)} has :kind option set to #{inspect(pad_kind)}, while it has te be one of \
#{[:audio, :video] |> inspect()} or be unspecified.
"""
end

if not state.track_notifications_sent? and
Enum.count(ctx.pads, &match?({Pad.ref(:output, _id), %{options: %{kind: nil}}}, &1)) > 1 do
raise """
If pads are linked before :new_tracks notifications and there are more then one of them, pad option \
:kind has to be specyfied.
"""
end

if state.track_notifications_sent? do
Pad.ref(:output, pad_id) = pad_ref

related_track =
state.track_to_pad_id
|> Map.keys()
|> Enum.find(&(state.track_to_pad_id[&1] == pad_id))

if related_track == nil do
raise """
Pad #{inspect(pad_ref)} doesn't have a related track. If you link pads after #{inspect(__MODULE__)} \
sent the track notification, you have to restrict yourself to the pad occuring in this notification. \
Tracks, that occured in this notification are: #{Map.keys(state.track_to_pad_id) |> inspect()}
"""
end

track_kind =
state.samples_info.sample_tables[related_track].sample_description
|> sample_description_to_kind()

if pad_kind != nil and pad_kind != track_kind do
raise """
Pad option :kind must match with the kind of the related track or be equal nil, but pad #{inspect(pad_ref)} \
kind is #{inspect(pad_kind)}, while the related track kind is #{inspect(track_kind)}
"""
end
end

:ok
end

@impl true
def handle_end_of_stream(:input, _ctx, %{all_pads_connected?: false} = state) do
{[], %{state | end_of_stream?: true}}
Expand All @@ -465,12 +605,6 @@ defmodule Membrane.MP4.Demuxer.ISOM do
_pad -> []
end)

Enum.each(pads, fn pad ->
if pad not in tracks do
raise "An output pad connected with #{pad} id, however no matching track exists"
end
end)

Range.size(tracks) == length(pads)
end

Expand All @@ -482,7 +616,8 @@ defmodule Membrane.MP4.Demuxer.ISOM do
|> Enum.reverse()
|> Enum.map(fn {buffer, ^track_id} -> buffer end)

{:buffer, {Pad.ref(:output, track_id), buffers}}
pad_id = state.track_to_pad_id[track_id]
{:buffer, {Pad.ref(:output, pad_id), buffers}}
end)

state = %{state | buffered_samples: %{}}
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.MP4.Plugin.MixProject do
use Mix.Project

@version "0.35.0"
@version "0.35.1"
@github_url "https://github.com/membraneframework/membrane_mp4_plugin"

def project do
Expand Down
13 changes: 6 additions & 7 deletions test/membrane_mp4/demuxer/isom/demuxer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -284,28 +284,27 @@ defmodule Membrane.MP4.Demuxer.ISOM.DemuxerTest do
end

defp start_testing_pipeline!(opts) do
structure = [
spec =
child(:file, %Membrane.File.Source{location: opts[:input_file]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> child(:sink, %Membrane.File.Sink{location: opts[:output_file]})
]

Pipeline.start_link_supervised!(spec: structure)
Pipeline.start_link_supervised!(spec: spec)
end

defp start_testing_pipeline_with_two_tracks!(opts) do
structure = [
spec = [
child(:file, %Membrane.File.Source{location: opts[:input_file]})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(Pad.ref(:output, 1), options: [kind: :video])
|> child(:video_sink, %Membrane.File.Sink{location: opts[:video_output_file]}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(Pad.ref(:output, 2), options: [kind: :audio])
|> child(:audio_sink, %Membrane.File.Sink{location: opts[:audio_output_file]})
]

Pipeline.start_link_supervised!(spec: structure)
Pipeline.start_link_supervised!(spec: spec)
end

defp start_remote_pipeline!(opts) do
Expand Down
4 changes: 2 additions & 2 deletions test/membrane_mp4/demuxer/isom/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ defmodule Membrane.MP4.Demuxer.ISOM.IntegrationTest do
demuxing_spec = [
child(:file, %Membrane.File.Source{location: mp4_path})
|> child(:demuxer, Membrane.MP4.Demuxer.ISOM)
|> via_out(Pad.ref(:output, 1))
|> via_out(Pad.ref(:output, 1), options: [kind: :video])
|> child(:parser_video, %Membrane.H264.Parser{output_stream_structure: :annexb})
|> child(:sink_video, %Membrane.File.Sink{location: out_video_path}),
get_child(:demuxer)
|> via_out(Pad.ref(:output, 2))
|> via_out(Pad.ref(:output, 2), options: [kind: :audio])
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :ADTS
})
Expand Down