Skip to content

Commit 8012af8

Browse files
committed
Allow format changes
1 parent d4f9956 commit 8012af8

File tree

5 files changed

+140
-135
lines changed

5 files changed

+140
-135
lines changed

lib/transcoder.ex

+16-16
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ defmodule Membrane.Transcoder do
8686
def handle_init(_ctx, opts) do
8787
spec = [
8888
bin_input()
89-
|> child(:forwarding_filter, ForwardingFilter),
89+
|> child(:connector, %Membrane.Connector{notify_on_stream_format?: true}),
9090
child(:output_funnel, Funnel)
9191
|> bin_output()
9292
]
@@ -102,12 +102,14 @@ defmodule Membrane.Transcoder do
102102
end
103103

104104
@impl true
105-
def handle_child_notification(
106-
{:stream_format, format},
107-
:forwarding_filter,
108-
_ctx,
109-
%{input_stream_format: nil} = state
110-
) do
105+
def handle_playing(_ctx, state) do
106+
IO.inspect(PLAYING)
107+
{[], state}
108+
end
109+
110+
@impl true
111+
def handle_child_notification({:stream_format, _pad, format}, :connector, _ctx, state)
112+
when state.input_stream_format == nil do
111113
state =
112114
%{state | input_stream_format: format}
113115
|> resolve_output_stream_format()
@@ -118,7 +120,7 @@ defmodule Membrane.Transcoder do
118120
end
119121

120122
spec =
121-
get_child(:forwarding_filter)
123+
get_child(:connector)
122124
|> plug_transcoding(
123125
format,
124126
state.output_stream_format,
@@ -130,16 +132,14 @@ defmodule Membrane.Transcoder do
130132
end
131133

132134
@impl true
133-
def handle_child_notification(
134-
{:stream_format, new_format},
135-
:forwarding_filter,
136-
_ctx,
137-
%{input_stream_format: non_nil_stream_format} = state
138-
) do
139-
if new_format != non_nil_stream_format do
135+
def handle_child_notification({:stream_format, _pad, new_format}, :connector, _ctx, state) do
136+
%new_stream_format_module{} = new_format
137+
%old_stream_format_module{} = state.input_stream_format
138+
139+
if new_stream_format_module != old_stream_format_module do
140140
raise """
141141
Received new stream format on transcoder's input: #{inspect(new_format)}
142-
which doesn't match the first received input stream format: #{inspect(non_nil_stream_format)}
142+
which doesn't match the first received input stream format: #{inspect(state.input_stream_format)}
143143
Transcoder doesn't support updating the input stream format.
144144
"""
145145
end

lib/transcoder/forwarding_filter.ex

+117-117
Original file line numberDiff line numberDiff line change
@@ -1,117 +1,117 @@
1-
defmodule Membrane.Transcoder.ForwardingFilter do
2-
@moduledoc false
3-
use Membrane.Filter
4-
5-
alias Membrane.TimestampQueue
6-
7-
def_input_pad :input,
8-
accepted_format: _any,
9-
availability: :on_request
10-
11-
def_output_pad :output,
12-
accepted_format: _any,
13-
availability: :on_request
14-
15-
defguardp is_input_linked(state) when state.input_pad_ref != nil
16-
defguardp is_output_linked(state) when state.output_pad_ref != nil
17-
18-
@impl true
19-
def handle_init(_ctx, _opts) do
20-
state = %{queue: TimestampQueue.new(), output_pad_ref: nil, input_pad_ref: nil}
21-
{[], state}
22-
end
23-
24-
@impl true
25-
def handle_playing(ctx, state), do: maybe_flush_queue(ctx, state)
26-
27-
@impl true
28-
def handle_pad_added(Pad.ref(direction, _id) = pad_ref, ctx, state) do
29-
same_direction_pads_number =
30-
ctx.pads
31-
|> Enum.count(fn {_pad_ref, pad_data} -> pad_data.direction == direction end)
32-
33-
if same_direction_pads_number > 1 do
34-
raise """
35-
#{inspect(__MODULE__)} can have only one #{inspect(direction)} pad, but it has \
36-
#{same_direction_pads_number}
37-
"""
38-
end
39-
40-
state =
41-
case direction do
42-
:input -> %{state | input_pad_ref: pad_ref}
43-
:output -> %{state | output_pad_ref: pad_ref}
44-
end
45-
46-
maybe_flush_queue(ctx, state)
47-
end
48-
49-
@impl true
50-
def handle_stream_format(_input_pad_ref, stream_format, _ctx, state)
51-
when is_output_linked(state) do
52-
{[
53-
stream_format: {state.output_pad_ref, stream_format},
54-
notify_parent: {:stream_format, stream_format}
55-
], state}
56-
end
57-
58-
@impl true
59-
def handle_stream_format(input_pad_ref, stream_format, _ctx, state) do
60-
queue = TimestampQueue.push_stream_format(state.queue, input_pad_ref, stream_format)
61-
{[notify_parent: {:stream_format, stream_format}], %{state | queue: queue}}
62-
end
63-
64-
@impl true
65-
def handle_buffer(_input_pad_ref, buffer, _ctx, state) when is_output_linked(state) do
66-
{[buffer: {state.output_pad_ref, buffer}], state}
67-
end
68-
69-
@impl true
70-
def handle_buffer(input_pad_ref, buffer, _ctx, state) do
71-
{_suggested_actions, queue} = TimestampQueue.push_buffer(state.queue, input_pad_ref, buffer)
72-
{[], %{state | queue: queue}}
73-
end
74-
75-
@impl true
76-
def handle_event(Pad.ref(:input, _id), event, _ctx, state) when is_output_linked(state) do
77-
{[forward: event], state}
78-
end
79-
80-
@impl true
81-
def handle_event(Pad.ref(:output, _id), event, _ctx, state) when is_input_linked(state) do
82-
{[forward: event], state}
83-
end
84-
85-
@impl true
86-
def handle_event(pad_ref, event, _ctx, state) do
87-
queue = TimestampQueue.push_event(state.queue, pad_ref, event)
88-
{[], %{state | queue: queue}}
89-
end
90-
91-
@impl true
92-
def handle_end_of_stream(_input_pad_ref, _ctx, state) when is_output_linked(state) do
93-
{[end_of_stream: state.output_pad_ref], state}
94-
end
95-
96-
@impl true
97-
def handle_end_of_stream(input_pad_ref, _ctx, state) do
98-
queue = TimestampQueue.push_end_of_stream(state.queue, input_pad_ref)
99-
{[], %{state | queue: queue}}
100-
end
101-
102-
defp maybe_flush_queue(ctx, state)
103-
when ctx.playback == :playing and is_input_linked(state) and is_output_linked(state) do
104-
{_suggested_actions, items, queue} = TimestampQueue.flush_and_close(state.queue)
105-
106-
actions =
107-
Enum.map(items, fn
108-
{Pad.ref(:input, _id), {item_type, item}} -> {item_type, {state.output_pad_ref, item}}
109-
{Pad.ref(:input, _id), :end_of_stream} -> {:end_of_stream, state.output_pad_ref}
110-
{Pad.ref(:output, _id), {:event, item}} -> {:event, {state.input_pad_ref, item}}
111-
end)
112-
113-
{actions, %{state | queue: queue}}
114-
end
115-
116-
defp maybe_flush_queue(_ctx, state), do: {[], state}
117-
end
1+
# defmodule Membrane.Transcoder.ForwardingFilter do
2+
# @moduledoc false
3+
# use Membrane.Filter
4+
5+
# alias Membrane.TimestampQueue
6+
7+
# def_input_pad :input,
8+
# accepted_format: _any,
9+
# availability: :on_request
10+
11+
# def_output_pad :output,
12+
# accepted_format: _any,
13+
# availability: :on_request
14+
15+
# defguardp is_input_linked(state) when state.input_pad_ref != nil
16+
# defguardp is_output_linked(state) when state.output_pad_ref != nil
17+
18+
# @impl true
19+
# def handle_init(_ctx, _opts) do
20+
# state = %{queue: TimestampQueue.new(), output_pad_ref: nil, input_pad_ref: nil}
21+
# {[], state}
22+
# end
23+
24+
# @impl true
25+
# def handle_playing(ctx, state), do: maybe_flush_queue(ctx, state)
26+
27+
# @impl true
28+
# def handle_pad_added(Pad.ref(direction, _id) = pad_ref, ctx, state) do
29+
# same_direction_pads_number =
30+
# ctx.pads
31+
# |> Enum.count(fn {_pad_ref, pad_data} -> pad_data.direction == direction end)
32+
33+
# if same_direction_pads_number > 1 do
34+
# raise """
35+
# #{inspect(__MODULE__)} can have only one #{inspect(direction)} pad, but it has \
36+
# #{same_direction_pads_number}
37+
# """
38+
# end
39+
40+
# state =
41+
# case direction do
42+
# :input -> %{state | input_pad_ref: pad_ref}
43+
# :output -> %{state | output_pad_ref: pad_ref}
44+
# end
45+
46+
# maybe_flush_queue(ctx, state)
47+
# end
48+
49+
# @impl true
50+
# def handle_stream_format(_input_pad_ref, stream_format, _ctx, state)
51+
# when is_output_linked(state) do
52+
# {[
53+
# stream_format: {state.output_pad_ref, stream_format},
54+
# notify_parent: {:stream_format, stream_format}
55+
# ], state}
56+
# end
57+
58+
# @impl true
59+
# def handle_stream_format(input_pad_ref, stream_format, _ctx, state) do
60+
# queue = TimestampQueue.push_stream_format(state.queue, input_pad_ref, stream_format)
61+
# {[notify_parent: {:stream_format, stream_format}], %{state | queue: queue}}
62+
# end
63+
64+
# @impl true
65+
# def handle_buffer(_input_pad_ref, buffer, _ctx, state) when is_output_linked(state) do
66+
# {[buffer: {state.output_pad_ref, buffer}], state}
67+
# end
68+
69+
# @impl true
70+
# def handle_buffer(input_pad_ref, buffer, _ctx, state) do
71+
# {_suggested_actions, queue} = TimestampQueue.push_buffer(state.queue, input_pad_ref, buffer)
72+
# {[], %{state | queue: queue}}
73+
# end
74+
75+
# @impl true
76+
# def handle_event(Pad.ref(:input, _id), event, _ctx, state) when is_output_linked(state) do
77+
# {[forward: event], state}
78+
# end
79+
80+
# @impl true
81+
# def handle_event(Pad.ref(:output, _id), event, _ctx, state) when is_input_linked(state) do
82+
# {[forward: event], state}
83+
# end
84+
85+
# @impl true
86+
# def handle_event(pad_ref, event, _ctx, state) do
87+
# queue = TimestampQueue.push_event(state.queue, pad_ref, event)
88+
# {[], %{state | queue: queue}}
89+
# end
90+
91+
# @impl true
92+
# def handle_end_of_stream(_input_pad_ref, _ctx, state) when is_output_linked(state) do
93+
# {[end_of_stream: state.output_pad_ref], state}
94+
# end
95+
96+
# @impl true
97+
# def handle_end_of_stream(input_pad_ref, _ctx, state) do
98+
# queue = TimestampQueue.push_end_of_stream(state.queue, input_pad_ref)
99+
# {[], %{state | queue: queue}}
100+
# end
101+
102+
# defp maybe_flush_queue(ctx, state)
103+
# when ctx.playback == :playing and is_input_linked(state) and is_output_linked(state) do
104+
# {_suggested_actions, items, queue} = TimestampQueue.flush_and_close(state.queue)
105+
106+
# actions =
107+
# Enum.map(items, fn
108+
# {Pad.ref(:input, _id), {item_type, item}} -> {item_type, {state.output_pad_ref, item}}
109+
# {Pad.ref(:input, _id), :end_of_stream} -> {:end_of_stream, state.output_pad_ref}
110+
# {Pad.ref(:output, _id), {:event, item}} -> {:event, {state.input_pad_ref, item}}
111+
# end)
112+
113+
# {actions, %{state | queue: queue}}
114+
# end
115+
116+
# defp maybe_flush_queue(_ctx, state), do: {[], state}
117+
# end

mix.exs

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ defmodule Membrane.Transcoder.Plugin.Mixfile do
3737

3838
defp deps do
3939
[
40-
{:membrane_core, "~> 1.1"},
40+
# {:membrane_core, "~> 1.2"},
41+
{:membrane_core, path: "../membrane_core", override: true},
4142
{:membrane_opus_plugin, "~> 0.20.3"},
4243
{:membrane_aac_plugin, "~> 0.19.0"},
4344
{:membrane_aac_fdk_plugin, "~> 0.18.0"},

test/integration_test.exs

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ defmodule Membrane.Transcoder.IntegrationTest do
3535
@test_cases @video_cases ++ @audio_cases
3636

3737
Enum.map(@test_cases, fn test_case ->
38+
if test_case.input_format == H264 and test_case.output_format == H264, do: @tag(:xd)
39+
3840
test "if transcoder support #{inspect(test_case.input_format)} input and #{inspect(test_case.output_format)} output" do
3941
pid = Testing.Pipeline.start_link_supervised!()
4042

@@ -43,6 +45,7 @@ defmodule Membrane.Transcoder.IntegrationTest do
4345
location: Path.join("./test/fixtures", unquote(test_case.input_file))
4446
})
4547
|> then(unquote(test_case.preprocess))
48+
# |> child(%Membrane.Debug.Filter{handle_stream_format: fn _ -> raise "dupaa" end})
4649
|> child(%Membrane.Transcoder{output_stream_format: unquote(test_case.output_format)})
4750
|> child(:sink, Testing.Sink)
4851

test/support/preprocessors.ex

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ defmodule Membrane.Transcoder.Support.Preprocessors do
2020

2121
@spec parse_h264(Membrane.ChildrenSpec.builder()) :: Membrane.ChildrenSpec.builder()
2222
def parse_h264(link_builder) do
23-
child(link_builder, %Membrane.H264.Parser{
23+
link_builder
24+
|> child(%Membrane.H264.Parser{
2425
output_alignment: :au,
2526
output_stream_structure: :annexb,
2627
generate_best_effort_timestamps: %{framerate: {30, 1}}

0 commit comments

Comments
 (0)