Skip to content

Commit 0c3f491

Browse files
authored
Merge pull request #40 from membraneframework/transcoding-v2
Transcoding
2 parents 32c9361 + c921c4b commit 0c3f491

39 files changed

+661
-87
lines changed

lib/boombox/elixir_stream.ex

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ defmodule Boombox.ElixirStream do
33

44
import Membrane.ChildrenSpec
55
require Membrane.Pad, as: Pad
6+
7+
alias __MODULE__.{Sink, Source}
68
alias Boombox.Pipeline.Ready
79

810
@options_audio_keys [:audio_format, :audio_rate, :audio_channels]
@@ -53,15 +55,21 @@ defmodule Boombox.ElixirStream do
5355
Enum.map(track_builders, fn
5456
{:audio, builder} ->
5557
builder
56-
|> then(&maybe_plug_resampler(&1, options))
58+
|> child(:mp4_audio_transcoder, %Boombox.Transcoder{
59+
output_stream_format: Membrane.RawAudio
60+
})
61+
|> maybe_plug_resampler(options)
5762
|> via_in(Pad.ref(:input, :audio))
5863
|> get_child(:elixir_stream_sink)
5964

6065
{:video, builder} ->
6166
builder
62-
|> child(%Membrane.H264.Parser{output_stream_structure: :annexb})
63-
|> child(Membrane.H264.FFmpeg.Decoder)
64-
|> child(%Membrane.FFmpeg.SWScale.Converter{format: :RGB})
67+
|> child(:elixir_stream_video_transcoder, %Boombox.Transcoder{
68+
output_stream_format: Membrane.RawVideo
69+
})
70+
|> child(:elixir_stream_rgb_converter, %Membrane.FFmpeg.SWScale.Converter{
71+
format: :RGB
72+
})
6573
|> via_in(Pad.ref(:input, :video))
6674
|> get_child(:elixir_stream_sink)
6775
end),

lib/boombox/elixir_stream/sink.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule Sink do
1+
defmodule Boombox.ElixirStream.Sink do
22
@moduledoc false
33
use Membrane.Sink
44

lib/boombox/elixir_stream/source.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
defmodule Source do
1+
defmodule Boombox.ElixirStream.Source do
22
@moduledoc false
33
use Membrane.Source
44

lib/boombox/hls.ex

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ defmodule Boombox.HLS do
55

66
require Membrane.Pad, as: Pad
77
alias Boombox.Pipeline.Ready
8+
alias Membrane.H264
89
alias Membrane.Time
910

1011
@spec link_output(
@@ -42,14 +43,19 @@ defmodule Boombox.HLS do
4243
Enum.map(track_builders, fn
4344
{:audio, builder} ->
4445
builder
45-
|> child(:hls_out_aac_encoder, Membrane.AAC.FDK.Encoder)
46+
|> child(:mp4_audio_transcoder, %Boombox.Transcoder{
47+
output_stream_format: Membrane.AAC
48+
})
4649
|> via_in(Pad.ref(:input, :audio),
4750
options: [encoding: :AAC, segment_duration: Time.milliseconds(2000)]
4851
)
4952
|> get_child(:hls_sink_bin)
5053

5154
{:video, builder} ->
5255
builder
56+
|> child(:hls_video_transcoder, %Boombox.Transcoder{
57+
output_stream_format: %H264{alignment: :au, stream_structure: :avc3}
58+
})
5359
|> via_in(Pad.ref(:input, :video),
5460
options: [encoding: :H264, segment_duration: Time.milliseconds(2000)]
5561
)

lib/boombox/mp4.ex

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ defmodule Boombox.MP4 do
44
import Membrane.ChildrenSpec
55
require Membrane.Pad, as: Pad
66
alias Boombox.Pipeline.{Ready, Wait}
7+
alias Membrane.H264
8+
alias Membrane.H265
9+
10+
defguardp is_h26x(format) when is_struct(format) and format.__struct__ in [H264, H265]
711

812
@spec create_input(String.t(), transport: :file | :http) :: Wait.t()
913
def create_input(location, opts) do
@@ -33,11 +37,10 @@ defmodule Boombox.MP4 do
3337
get_child(:mp4_demuxer)
3438
|> via_out(Pad.ref(:output, id))
3539
|> child(:mp4_in_aac_parser, Membrane.AAC.Parser)
36-
|> child(:mp4_in_aac_decoder, Membrane.AAC.FDK.Decoder)
3740

3841
{:audio, spec}
3942

40-
{id, %Membrane.H264{}} ->
43+
{id, video_format} when is_h26x(video_format) ->
4144
spec =
4245
get_child(:mp4_demuxer)
4346
|> via_out(Pad.ref(:output, id))
@@ -62,7 +65,9 @@ defmodule Boombox.MP4 do
6265
Enum.map(track_builders, fn
6366
{:audio, builder} ->
6467
builder
65-
|> child(:mp4_out_aac_encoder, Membrane.AAC.FDK.Encoder)
68+
|> child(:mp4_audio_transcoder, %Boombox.Transcoder{
69+
output_stream_format: Membrane.AAC
70+
})
6671
|> child(:mp4_out_aac_parser, %Membrane.AAC.Parser{
6772
out_encapsulation: :none,
6873
output_config: :esds
@@ -72,7 +77,21 @@ defmodule Boombox.MP4 do
7277

7378
{:video, builder} ->
7479
builder
75-
|> child(:mp4_out_h264_parser, %Membrane.H264.Parser{output_stream_structure: :avc3})
80+
|> child(:mp4_video_transcoder, %Boombox.Transcoder{
81+
output_stream_format: fn
82+
%H264{stream_structure: :annexb} = h264 ->
83+
%H264{h264 | stream_structure: :avc3, alignment: :au}
84+
85+
%H265{stream_structure: :annexb} = h265 ->
86+
%H265{h265 | stream_structure: :hev1, alignment: :au}
87+
88+
h26x when is_h26x(h26x) ->
89+
%{h26x | alignment: :au}
90+
91+
_not_h26x ->
92+
%H264{stream_structure: :avc3, alignment: :au}
93+
end
94+
})
7695
|> via_in(Pad.ref(:input, :video))
7796
|> get_child(:mp4_muxer)
7897
end)

lib/boombox/pipeline.ex

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ defmodule Boombox.Pipeline do
6262
track_builders: nil,
6363
last_result: nil,
6464
eos_info: nil,
65-
rtsp_state: nil
65+
rtsp_state: nil,
66+
pending_new_tracks: %{input: [], output: []},
67+
output_webrtc_state: nil
6668
]
6769

6870
@typedoc """
@@ -92,7 +94,8 @@ defmodule Boombox.Pipeline do
9294
last_result: Boombox.Pipeline.Ready.t() | Boombox.Pipeline.Wait.t() | nil,
9395
eos_info: term(),
9496
rtsp_state: Boombox.RTSP.rtsp_state() | nil,
95-
parent: pid()
97+
parent: pid(),
98+
output_webrtc_state: Boombox.WebRTC.output_webrtc_state() | nil
9699
}
97100
end
98101

@@ -132,6 +135,12 @@ defmodule Boombox.Pipeline do
132135
|> proceed_result(ctx, state)
133136
end
134137

138+
@impl true
139+
def handle_child_notification({:negotiated_video_codecs, codecs}, :webrtc_output, ctx, state) do
140+
{result, state} = Boombox.WebRTC.handle_output_video_codecs_negotiated(codecs, state)
141+
proceed_result(result, ctx, state)
142+
end
143+
135144
@impl true
136145
def handle_child_notification({:new_tracks, tracks}, :webrtc_output, ctx, state) do
137146
unless state.status == :awaiting_output_link do
@@ -144,7 +153,8 @@ defmodule Boombox.Pipeline do
144153
Boombox.WebRTC.handle_output_tracks_negotiated(
145154
state.track_builders,
146155
state.spec_builder,
147-
tracks
156+
tracks,
157+
state
148158
)
149159
|> proceed_result(ctx, state)
150160
end
@@ -206,6 +216,10 @@ defmodule Boombox.Pipeline do
206216

207217
@spec proceed_result(Ready.t() | Wait.t(), Membrane.Pipeline.CallbackContext.t(), State.t()) ::
208218
Membrane.Pipeline.callback_return()
219+
defp proceed_result(result, ctx, %{status: :awaiting_output} = state) do
220+
do_proceed(result, :output_ready, :awaiting_output, ctx, state)
221+
end
222+
209223
defp proceed_result(result, ctx, %{status: :awaiting_input} = state) do
210224
do_proceed(result, :input_ready, :awaiting_input, ctx, state)
211225
end
@@ -215,7 +229,7 @@ defmodule Boombox.Pipeline do
215229
end
216230

217231
defp proceed_result(result, ctx, %{status: :running} = state) do
218-
do_proceed(result, nil, :running, ctx, state)
232+
do_proceed(result, :running, :running, ctx, state)
219233
end
220234

221235
defp proceed_result(_result, _ctx, state) do
@@ -227,8 +241,8 @@ defmodule Boombox.Pipeline do
227241
@spec proceed(Membrane.Pipeline.CallbackContext.t(), State.t()) ::
228242
Membrane.Pipeline.callback_return()
229243
defp proceed(ctx, %{status: :init} = state) do
230-
create_output(state.output, ctx, state)
231-
|> do_proceed(:output_ready, :awaiting_output, ctx, state)
244+
{ready_or_wait, state} = create_output(state.output, ctx, state)
245+
do_proceed(ready_or_wait, :output_ready, :awaiting_output, ctx, state)
232246
end
233247

234248
defp proceed(ctx, %{status: :output_ready} = state) do
@@ -287,7 +301,7 @@ defmodule Boombox.Pipeline do
287301
@spec create_input(Boombox.input(), Membrane.Pipeline.CallbackContext.t(), State.t()) ::
288302
Ready.t() | Wait.t()
289303
defp create_input({:webrtc, signaling}, _ctx, state) do
290-
Boombox.WebRTC.create_input(signaling, state.output)
304+
Boombox.WebRTC.create_input(signaling, state.output, state)
291305
end
292306

293307
defp create_input({:mp4, location, opts}, _ctx, _state) do
@@ -307,13 +321,13 @@ defmodule Boombox.Pipeline do
307321
end
308322

309323
@spec create_output(Boombox.output(), Membrane.Pipeline.CallbackContext.t(), State.t()) ::
310-
Ready.t() | Wait.t()
311-
defp create_output({:webrtc, signaling}, _ctx, _state) do
312-
Boombox.WebRTC.create_output(signaling)
324+
{Ready.t() | Wait.t(), State.t()}
325+
defp create_output({:webrtc, signaling}, _ctx, state) do
326+
Boombox.WebRTC.create_output(signaling, state)
313327
end
314328

315-
defp create_output(_output, _ctx, _state) do
316-
%Ready{}
329+
defp create_output(_output, _ctx, state) do
330+
{%Ready{}, state}
317331
end
318332

319333
@spec link_output(
@@ -324,8 +338,13 @@ defmodule Boombox.Pipeline do
324338
State.t()
325339
) ::
326340
Ready.t() | Wait.t()
327-
defp link_output({:webrtc, _signaling}, track_builders, _spec_builder, _ctx, _state) do
328-
Boombox.WebRTC.link_output(track_builders)
341+
defp link_output({:webrtc, _signaling}, track_builders, spec_builder, _ctx, state) do
342+
tracks = [
343+
%{kind: :audio, id: :audio_track},
344+
%{kind: :video, id: :video_tracks}
345+
]
346+
347+
Boombox.WebRTC.link_output(track_builders, spec_builder, tracks, state)
329348
end
330349

331350
defp link_output({:mp4, location}, track_builders, spec_builder, _ctx, _state) do

lib/boombox/rtmp.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,9 @@ defmodule Boombox.RTMP do
4747
child(:rtmp_source, %RTMP.SourceBin{client_ref: client_ref})
4848
|> via_out(:audio)
4949
|> child(:rtmp_in_aac_parser, Membrane.AAC.Parser)
50-
|> child(:rtmp_in_aac_decoder, Membrane.AAC.FDK.Decoder)
5150

5251
track_builders = %{
53-
audio: get_child(:rtmp_in_aac_decoder),
52+
audio: get_child(:rtmp_in_aac_parser),
5453
video: get_child(:rtmp_source) |> via_out(:video)
5554
}
5655

lib/boombox/rtsp.ex

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ defmodule Boombox.RTSP do
5959
spec =
6060
get_child(:rtsp_source)
6161
|> via_out(Membrane.Pad.ref(:output, ssrc))
62-
|> child(Membrane.Debug.Sink)
6362

6463
{spec, track_builders}
6564

@@ -82,7 +81,6 @@ defmodule Boombox.RTSP do
8281
get_child(:rtsp_source)
8382
|> via_out(Membrane.Pad.ref(:output, ssrc))
8483
|> child(:rtsp_in_aac_parser, Membrane.AAC.Parser)
85-
|> child(:rtsp_in_aac_decoder, Membrane.AAC.FDK.Decoder)
8684

8785
{[], Map.put(track_builders, :audio, audio_spec)}
8886

lib/boombox/transcoder.ex

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
defmodule Boombox.Transcoder do
2+
@moduledoc false
3+
use Membrane.Bin
4+
5+
require __MODULE__.Audio
6+
require __MODULE__.Video
7+
8+
alias __MODULE__.{Audio, ForwardingFilter, Video}
9+
alias Membrane.{AAC, Funnel, H264, H265, Opus, RawAudio, RawVideo, RemoteStream, VP8}
10+
11+
@type stream_format ::
12+
H264.t()
13+
| H265.t()
14+
| VP8.t()
15+
| RawVideo.t()
16+
| AAC.t()
17+
| Opus.t()
18+
| RemoteStream.t()
19+
| RawAudio.t()
20+
21+
@type stream_format_module :: H264 | H265 | VP8 | RawVideo | AAC | Opus | RawAudio
22+
23+
@type stream_format_resolver :: (stream_format() -> stream_format() | stream_format_module())
24+
25+
def_input_pad :input,
26+
accepted_format: format when Audio.is_audio_format(format) or Video.is_video_format(format)
27+
28+
def_output_pad :output,
29+
accepted_format: format when Audio.is_audio_format(format) or Video.is_video_format(format)
30+
31+
def_options output_stream_format: [
32+
spec: stream_format() | stream_format_module() | stream_format_resolver()
33+
]
34+
35+
@impl true
36+
def handle_init(_ctx, opts) do
37+
spec = [
38+
bin_input()
39+
|> child(:forwarding_filter, ForwardingFilter),
40+
child(:output_funnel, Funnel)
41+
|> bin_output()
42+
]
43+
44+
state =
45+
opts
46+
|> Map.from_struct()
47+
|> Map.put(:input_stream_format, nil)
48+
49+
{[spec: spec], state}
50+
end
51+
52+
@impl true
53+
def handle_child_notification({:stream_format, format}, :forwarding_filter, _ctx, state) do
54+
state =
55+
%{state | input_stream_format: format}
56+
|> resolve_output_stream_format()
57+
58+
spec =
59+
get_child(:forwarding_filter)
60+
|> plug_transcoding(format, state.output_stream_format)
61+
|> get_child(:output_funnel)
62+
63+
{[spec: spec], state}
64+
end
65+
66+
@impl true
67+
def handle_child_notification(_notification, _element, _ctx, state) do
68+
{[], state}
69+
end
70+
71+
defp resolve_output_stream_format(state) do
72+
case state.output_stream_format do
73+
format when is_struct(format) ->
74+
state
75+
76+
module when is_atom(module) ->
77+
%{state | output_stream_format: struct(module)}
78+
79+
resolver when is_function(resolver) ->
80+
%{state | output_stream_format: resolver.(state.input_stream_format)}
81+
|> resolve_output_stream_format()
82+
end
83+
end
84+
85+
defp plug_transcoding(builder, input_format, output_format)
86+
when Audio.is_audio_format(input_format) do
87+
builder |> Audio.plug_audio_transcoding(input_format, output_format)
88+
end
89+
90+
defp plug_transcoding(builder, input_format, output_format)
91+
when Video.is_video_format(input_format) do
92+
builder |> Video.plug_video_transcoding(input_format, output_format)
93+
end
94+
end

0 commit comments

Comments
 (0)