Skip to content

Commit 55c2e3b

Browse files
authored
RTP output (#51)
* Create rtp output * Add a working test * Add Opus and H265 support
1 parent 2df5104 commit 55c2e3b

File tree

6 files changed

+289
-88
lines changed

6 files changed

+289
-88
lines changed

lib/boombox.ex

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,20 @@ defmodule Boombox do
2424
@typedoc """
2525
Some encodings can/must be accompanied with encoding specific parameters:
2626
* AAC:
27-
- bitrate_mode - has to be provided. Defines how the RTP stream was payloaded and should be depayloaded.
28-
- audio_specific_config - has to be provided. Contains crucial information about the stream and has to be obtained from a side channel.
27+
- bitrate_mode - MUST be provided for both RTP input and output. Defines which mode should be assumed/set when depayloading/payloading.
28+
- audio_specific_config - MUST be provided for RTP input. Contains crucial information about the stream and has to be obtained from a side channel.
2929
* H264 and H265:
30-
- vpss (H265 only), ppss, spss - optional. Parameter sets, could be obtained from a side channel. They contain information about the encoded stream.
30+
- vpss (H265 only), ppss, spss - MAY be provided for RTP input. picture and sequence parameter sets, could be obtained from a side channel. They contain information about the encoded stream.
3131
"""
3232
@type rtp_encoding_specific_params ::
33-
{:AAC, [bitrate_mode: RTP.AAC.Utils.mode(), audio_specific_config: binary()]}
34-
| {:H264, [ppss: [binary()], spss: [binary()]]}
35-
| {:H265, [vpss: [binary()], ppss: [binary()], spss: [binary()]]}
33+
{:AAC, [{:bitrate_mode, RTP.AAC.Utils.mode()} | {:audio_specific_config, binary()}]}
34+
| {:H264, [{:ppss, [binary()]} | {:spss, [binary()]}]}
35+
| {:H265, [{:vpss, [binary()]} | {:ppss, [binary()]} | {:spss, [binary()]}]}
3636

3737
@typedoc """
3838
For each media type the following parameters are specified:
39-
* encoding - has to be provided, some encodings require additional parameters, see `rtp_encoding_specific_params/0`.
40-
* payload_type, clock rate - most likely should be provided, if not, then an unofficial default will be used.
39+
* encoding - MUST be provided for both RTP input and output, some encodings require additional parameters, see `rtp_encoding_specific_params/0`.
40+
* payload_type, clock rate - MAY be provided for both RTP input and output, if not, then an unofficial default will be used.
4141
"""
4242
@type rtp_track_config :: [
4343
{:encoding, RTP.encoding_name() | rtp_encoding_specific_params()}
@@ -51,7 +51,25 @@ defmodule Boombox do
5151
"""
5252
@type in_rtp_opts :: [
5353
{:port, :inet.port_number()}
54-
| {:track_configs, [audio: rtp_track_config(), video: rtp_track_config()]}
54+
| {:track_configs,
55+
[
56+
{:audio, rtp_track_config()}
57+
| {:video, rtp_track_config()}
58+
]}
59+
]
60+
61+
@typedoc """
62+
In order to configure RTP output the destination and media configurations must be provided.
63+
At least one media type needs to be configured.
64+
"""
65+
@type out_rtp_opts :: [
66+
{:address, :inet.ip_address()}
67+
| {:port, :inet.port_number()}
68+
| {:track_configs,
69+
[
70+
{:audio, rtp_track_config()}
71+
| {:video, rtp_track_config()}
72+
]}
5573
]
5674

5775
@type input ::
@@ -68,6 +86,7 @@ defmodule Boombox do
6886
| {:mp4, location :: String.t()}
6987
| {:webrtc, webrtc_signaling()}
7088
| {:hls, location :: String.t()}
89+
| {:rtp, out_rtp_opts()}
7190
| {:stream, out_stream_opts()}
7291

7392
@typep procs :: %{pipeline: pid(), supervisor: pid()}

lib/boombox/pipeline.ex

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,11 @@ defmodule Boombox.Pipeline do
198198
{[terminate: :normal], state}
199199
end
200200

201+
@impl true
202+
def handle_element_end_of_stream(:udp_rtp_sink, :input, _ctx, state) do
203+
{[terminate: :normal], state}
204+
end
205+
201206
@impl true
202207
def handle_element_end_of_stream(:elixir_stream_sink, Pad.ref(:input, id), _ctx, state) do
203208
eos_info = List.delete(state.eos_info, id)
@@ -316,8 +321,8 @@ defmodule Boombox.Pipeline do
316321
Boombox.RTSP.create_input(uri)
317322
end
318323

319-
defp create_input({:rtp, params}, _ctx, _state) do
320-
Boombox.RTP.create_input(params)
324+
defp create_input({:rtp, opts}, _ctx, _state) do
325+
Boombox.RTP.create_input(opts)
321326
end
322327

323328
defp create_input({:stream, params}, _ctx, state) do
@@ -359,6 +364,10 @@ defmodule Boombox.Pipeline do
359364
Boombox.HLS.link_output(location, track_builders, spec_builder)
360365
end
361366

367+
defp link_output({:rtp, opts}, track_builders, spec_builder, _ctx, _state) do
368+
Boombox.RTP.link_output(opts, track_builders, spec_builder)
369+
end
370+
362371
defp link_output({:stream, opts}, track_builders, spec_builder, _ctx, state) do
363372
Boombox.ElixirStream.link_output(state.parent, opts, track_builders, spec_builder)
364373
end

lib/boombox/rtp.ex

Lines changed: 147 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,39 +7,72 @@ defmodule Boombox.RTP do
77
alias Boombox.Pipeline.Ready
88
alias Membrane.RTP
99

10-
@required_opts [:port, :track_configs]
10+
@required_opts %{
11+
input: [:port, :track_configs],
12+
output: [:address, :port, :track_configs]
13+
}
14+
1115
@required_encoding_specific_params %{
12-
AAC: [bitrate_mode: [require?: true], audio_specific_config: [require?: true]],
13-
OPUS: [],
14-
H264: [ppss: [require?: false], spss: [require?: false]],
15-
H265: [ppss: [require?: false], spss: [require?: false]]
16+
input: %{
17+
AAC: [bitrate_mode: [require?: true], audio_specific_config: [require?: true]],
18+
H264: [ppss: [require?: false, default: []], spss: [require?: false, default: []]],
19+
H265: [
20+
vpss: [require?: false, default: []],
21+
ppss: [require?: false, default: []],
22+
spss: [require?: false, default: []]
23+
]
24+
},
25+
output: %{
26+
AAC: [bitrate_mode: [require?: true]]
27+
}
1628
}
1729

18-
@type parsed_encoding_specific_params ::
30+
@type parsed_input_encoding_specific_params ::
1931
%{bitrate_mode: RTP.AAC.Utils.mode(), audio_specific_config: binary()}
20-
| %{optional(:ppss) => [binary()], optional(:spss) => [binary()]}
21-
| %{
22-
optional(:vpss) => [binary()],
23-
optional(:ppss) => [binary()],
24-
optional(:spss) => [binary()]
25-
}
32+
| %{:ppss => [binary()], :spss => [binary()]}
33+
| %{:vpss => [binary()], :ppss => [binary()], :spss => [binary()]}
34+
| %{}
35+
36+
@type parsed_output_encoding_specific_params ::
37+
%{bitrate_mode: RTP.AAC.Utils.mode()}
2638
| %{}
2739

28-
@type parsed_track_config :: %{
40+
@type parsed_input_track_config :: %{
41+
encoding_name: RTP.encoding_name(),
42+
encoding_specific_params: parsed_input_encoding_specific_params(),
43+
payload_type: RTP.payload_type(),
44+
clock_rate: RTP.clock_rate()
45+
}
46+
47+
@type parsed_output_track_config :: %{
2948
encoding_name: RTP.encoding_name(),
30-
encoding_specific_params: parsed_encoding_specific_params(),
49+
encoding_specific_params: parsed_output_encoding_specific_params(),
3150
payload_type: RTP.payload_type(),
3251
clock_rate: RTP.clock_rate()
3352
}
3453

3554
@type parsed_in_opts :: %{
3655
port: :inet.port_number(),
37-
track_configs: %{audio: parsed_track_config(), video: parsed_track_config()}
56+
track_configs: %{
57+
optional(:audio) => parsed_input_track_config(),
58+
optional(:video) => parsed_input_track_config()
59+
}
3860
}
3961

62+
@type parsed_out_opts :: %{
63+
address: :inet.ip_address(),
64+
port: :inet.port_number(),
65+
track_configs: %{
66+
optional(:audio) => parsed_output_track_config(),
67+
optional(:video) => parsed_output_track_config()
68+
}
69+
}
70+
71+
@type parsed_track_config :: parsed_input_track_config() | parsed_output_track_config()
72+
4073
@spec create_input(Boombox.in_rtp_opts()) :: Ready.t()
4174
def create_input(opts) do
42-
parsed_options = validate_and_parse_options(opts)
75+
parsed_options = validate_and_parse_options(:input, opts)
4376
payload_type_mapping = get_payload_type_mapping(parsed_options.track_configs)
4477

4578
spec =
@@ -51,8 +84,8 @@ defmodule Boombox.RTP do
5184
{depayloader, parser} =
5285
case track_config.encoding_name do
5386
:H264 ->
54-
ppss = Map.get(track_config.encoding_specific_params, :ppss, [])
55-
spss = Map.get(track_config.encoding_specific_params, :spss, [])
87+
ppss = track_config.encoding_specific_params.ppss
88+
spss = track_config.encoding_specific_params.spss
5689
{Membrane.RTP.H264.Depayloader, %Membrane.H264.Parser{ppss: ppss, spss: spss}}
5790

5891
:AAC ->
@@ -77,21 +110,86 @@ defmodule Boombox.RTP do
77110
spec =
78111
get_child(:rtp_demuxer)
79112
|> via_out(:output, options: [stream_id: {:encoding_name, track_config.encoding_name}])
80-
|> child({:jitter_buffer, track_config.encoding_name}, %Membrane.RTP.JitterBuffer{
113+
|> child({:jitter_buffer, media_type}, %Membrane.RTP.JitterBuffer{
81114
clock_rate: track_config.clock_rate
82115
})
83-
|> child({:rtp_depayloader, track_config.encoding_name}, depayloader)
84-
|> child({:rtp_in_parser, track_config.encoding_name}, parser)
116+
|> child({:rtp_depayloader, media_type}, depayloader)
117+
|> child({:rtp_in_parser, media_type}, parser)
85118

86119
{media_type, spec}
87120
end)
88121

89122
%Ready{spec_builder: spec, track_builders: track_builders}
90123
end
91124

92-
@spec validate_and_parse_options(Boombox.in_rtp_opts()) :: parsed_in_opts()
93-
defp validate_and_parse_options(opts) do
94-
Enum.each(@required_opts, fn required_option ->
125+
@spec link_output(
126+
Boombox.out_rtp_opts(),
127+
Boombox.Pipeline.track_builders(),
128+
Membrane.ChildrenSpec.t()
129+
) :: Ready.t()
130+
def link_output(opts, track_builders, spec_builder) do
131+
parsed_opts = validate_and_parse_options(:output, opts)
132+
133+
spec = [
134+
spec_builder,
135+
child(:rtp_muxer, Membrane.RTP.Muxer)
136+
|> child(:udp_rtp_sink, %Membrane.UDP.Sink{
137+
destination_address: parsed_opts.address,
138+
destination_port_no: parsed_opts.port
139+
}),
140+
Enum.map(track_builders, fn {media_type, builder} ->
141+
track_config = parsed_opts.track_configs[media_type]
142+
143+
{output_stream_format, parser, payloader} =
144+
case track_config.encoding_name do
145+
:H264 ->
146+
{%Membrane.H264{stream_structure: :annexb, alignment: :nalu},
147+
%Membrane.H264.Parser{output_stream_structure: :annexb, output_alignment: :nalu},
148+
Membrane.RTP.H264.Payloader}
149+
150+
:AAC ->
151+
{%Membrane.AAC{encapsulation: :none},
152+
%Membrane.AAC.Parser{out_encapsulation: :none},
153+
%Membrane.RTP.AAC.Payloader{
154+
mode: track_config.encoding_specific_params.bitrate_mode,
155+
frames_per_packet: 1
156+
}}
157+
158+
:OPUS ->
159+
{Membrane.Opus, %Membrane.Opus.Parser{delimitation: :undelimit},
160+
Membrane.RTP.Opus.Payloader}
161+
162+
:H265 ->
163+
{%Membrane.H265{stream_structure: :annexb, alignment: :nalu},
164+
%Membrane.H265.Parser{output_stream_structure: :annexb, output_alignment: :nalu},
165+
Membrane.RTP.H265.Payloader}
166+
end
167+
168+
builder
169+
|> child({:rtp_transcoder, media_type}, %Boombox.Transcoder{
170+
output_stream_format: output_stream_format
171+
})
172+
|> child({:rtp_out_parser, media_type}, parser)
173+
|> child({:rtp_payloader, media_type}, payloader)
174+
|> child({:realtimer, media_type}, Membrane.Realtimer)
175+
|> via_in(:input,
176+
options: [
177+
encoding: track_config.encoding_name,
178+
payload_type: track_config.payload_type,
179+
clock_rate: track_config.clock_rate
180+
]
181+
)
182+
|> get_child(:rtp_muxer)
183+
end)
184+
]
185+
186+
%Ready{actions: [spec: spec]}
187+
end
188+
189+
@spec validate_and_parse_options(:input, Boombox.in_rtp_opts()) :: parsed_in_opts()
190+
@spec validate_and_parse_options(:output, Boombox.out_rtp_opts()) :: parsed_out_opts()
191+
defp validate_and_parse_options(direction, opts) do
192+
Enum.each(@required_opts[direction], fn required_option ->
95193
unless Keyword.has_key?(opts, required_option) do
96194
raise "Required option #{inspect(required_option)} not present in passed RTP options"
97195
end
@@ -103,16 +201,25 @@ defmodule Boombox.RTP do
103201

104202
parsed_track_configs =
105203
Map.new(opts[:track_configs], fn {media_type, track_config} ->
106-
{media_type, validate_and_parse_track_config!(track_config)}
204+
{media_type, validate_and_parse_track_config!(direction, track_config)}
107205
end)
108206

109-
%{port: opts[:port], track_configs: parsed_track_configs}
207+
case direction do
208+
:input ->
209+
%{port: opts[:port], track_configs: parsed_track_configs}
210+
211+
:output ->
212+
%{address: opts[:address], port: opts[:port], track_configs: parsed_track_configs}
213+
end
110214
end
111215

112-
@spec validate_and_parse_track_config!(Boombox.rtp_track_config()) :: parsed_track_config()
113-
defp validate_and_parse_track_config!(track_config) do
216+
@spec validate_and_parse_track_config!(:input, Boombox.rtp_track_config()) ::
217+
parsed_input_track_config()
218+
@spec validate_and_parse_track_config!(:output, Boombox.rtp_track_config()) ::
219+
parsed_output_track_config()
220+
defp validate_and_parse_track_config!(direction, track_config) do
114221
{encoding_name, encoding_specific_params} =
115-
validate_and_parse_encoding!(track_config[:encoding])
222+
validate_and_parse_encoding!(direction, track_config[:encoding])
116223

117224
track_config = Keyword.put(track_config, :encoding_name, encoding_name)
118225

@@ -135,18 +242,24 @@ defmodule Boombox.RTP do
135242
}
136243
end
137244

138-
@spec validate_and_parse_encoding!(RTP.encoding_name() | Boombox.rtp_encoding_specific_params()) ::
139-
{RTP.encoding_name(), %{}} | parsed_encoding_specific_params()
140-
defp validate_and_parse_encoding!(encoding) do
245+
@spec validate_and_parse_encoding!(
246+
:input,
247+
RTP.encoding_name() | Boombox.rtp_encoding_specific_params()
248+
) :: {RTP.encoding_name(), parsed_input_encoding_specific_params()}
249+
@spec validate_and_parse_encoding!(
250+
:output,
251+
RTP.encoding_name() | Boombox.rtp_encoding_specific_params()
252+
) :: {RTP.encoding_name(), parsed_output_encoding_specific_params()}
253+
defp validate_and_parse_encoding!(direction, encoding) do
141254
case encoding do
142255
nil ->
143256
raise "Encoding name not provided"
144257

145258
encoding when is_atom(encoding) ->
146-
validate_and_parse_encoding!({encoding, []})
259+
validate_and_parse_encoding!(direction, {encoding, []})
147260

148261
{encoding, encoding_params} when is_atom(encoding) ->
149-
field_specs = Map.get(@required_encoding_specific_params, encoding, [])
262+
field_specs = Map.get(@required_encoding_specific_params[direction], encoding, [])
150263
{:ok, encoding_params} = Bunch.Config.parse(encoding_params, field_specs)
151264
{encoding, encoding_params}
152265
end

0 commit comments

Comments
 (0)