Skip to content

Commit 2df5104

Browse files
authored
RTP input (#50)
* Working rtp input * Add comprehensive options parsing * Update readme * Add an example
1 parent 0c3f491 commit 2df5104

File tree

10 files changed

+331
-29
lines changed

10 files changed

+331
-29
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ For more examples, see [examples.livemd](examples.livemd).
7272
| WebRTC | `{:webrtc, signaling}` | `{:webrtc, signaling}` |
7373
| RTMP | `"rtmp://*"` | _not supported_ |
7474
| RTSP | `"rtsp://*"` | _not supported_ |
75+
| RTP | `{:rtp, opts}` | _not yet supported_ |
7576
| HLS | _not supported_ | `"*.m3u8"` |
7677
| `Enumerable.t()` | `{:stream, opts}` | `{:stream, opts}` |
7778

examples.livemd

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ unless File.exists?("#{input_dir}/bun.mp4") do
3535
File.write!("#{input_dir}/bun.mp4", bbb_mp4)
3636
end
3737

38+
unless File.exists?("#{input_dir}/bun.mkv") do
39+
%{status: 200, body: bbb_mkv} = Req.get!("#{samples_url}/big-buck-bunny/bun33s.mkv")
40+
File.write!("#{input_dir}/bun.mkv", bbb_mkv)
41+
end
42+
3843
assets_url =
3944
"https://raw.githubusercontent.com/membraneframework/boombox/master/boombox_examples_data"
4045

@@ -528,6 +533,34 @@ Boombox.run(input: "rtsp://localhost:#{rtsp_port}/", output: "#{out_dir}/index.m
528533

529534
<!-- livebook:{"branch_parent_index":0} -->
530535

536+
## Receive RTP, broadcast via HLS
537+
538+
To receive the stream, visit http://localhost:1234/hls.html after running the cell below
539+
540+
```elixir
541+
rtp_port = 50001
542+
t =
543+
Task.async(fn ->
544+
Boombox.run(
545+
input: {:rtp, port: rtp_port, track_configs: [audio: [encoding: :OPUS], video: [encoding: :H264]]},
546+
output: "#{out_dir}/index.m3u8"
547+
)
548+
end)
549+
550+
{_output, 0} =
551+
System.shell("""
552+
ffmpeg -re -i #{input_dir}/bun.mkv \
553+
-map 0:v:0 -c:v copy -payload_type 96 -f rtp rtp://127.0.0.1:#{rtp_port} \
554+
-map 0:a:0 -c:a copy -payload_type 120 -f rtp rtp://127.0.0.1:#{rtp_port}
555+
""")
556+
557+
Process.sleep(200)
558+
559+
Task.shutdown(t)
560+
```
561+
562+
<!-- livebook:{"branch_parent_index":0} -->
563+
531564
## Stream MP4 via WebRTC
532565

533566
To receive the stream, visit http://localhost:1234/webrtc_to_browser.html after running the cell below.

lib/boombox.ex

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@ defmodule Boombox do
44
55
See `run/1` for details and [examples.livemd](examples.livemd) for examples.
66
"""
7-
87
require Membrane.Time
98

9+
alias Membrane.RTP
10+
1011
@type webrtc_signaling :: Membrane.WebRTC.SignalingChannel.t() | String.t()
11-
@type in_stream_opts :: [audio: :binary | boolean(), video: :image | boolean()]
12+
@type in_stream_opts :: [
13+
{:audio, :binary | boolean()}
14+
| {:video, :image | boolean()}
15+
]
1216
@type out_stream_opts :: [
1317
{:audio, :binary | boolean()}
1418
| {:video, :image | boolean()}
@@ -17,12 +21,46 @@ defmodule Boombox do
1721
| {:audio_channels, Membrane.RawAudio.channels_t()}
1822
]
1923

24+
@typedoc """
25+
Some encodings can/must be accompanied with encoding specific parameters:
26+
* AAC:
27+
- bitrate_mode - has to be provided. Defines how the RTP stream was payloaded and should be depayloaded.
28+
- audio_specific_config - has to be provided. Contains crucial information about the stream and has to be obtained from a side channel.
29+
* H264 and H265:
30+
- vpss (H265 only), ppss, spss - optional. Parameter sets, could be obtained from a side channel. They contain information about the encoded stream.
31+
"""
32+
@type rtp_encoding_specific_params ::
33+
{:AAC, [bitrate_mode: RTP.AAC.Utils.mode(), audio_specific_config: binary()]}
34+
| {:H264, [ppss: [binary()], spss: [binary()]]}
35+
| {:H265, [vpss: [binary()], ppss: [binary()], spss: [binary()]]}
36+
37+
@typedoc """
38+
For each media type the following parameters are specified:
39+
* encoding - has to be provided, some encodings require additional parameters, see `rtp_encoding_specific_params/0`.
40+
* payload_type, clock rate - most likely should be provided, if not, then an unofficial default will be used.
41+
"""
42+
@type rtp_track_config :: [
43+
{:encoding, RTP.encoding_name() | rtp_encoding_specific_params()}
44+
| {:payload_type, RTP.payload_type()}
45+
| {:clock_rate, RTP.clock_rate()}
46+
]
47+
48+
@typedoc """
49+
In order to configure RTP input both a receiving port and media configurations must be provided.
50+
At least one media type needs to be configured.
51+
"""
52+
@type in_rtp_opts :: [
53+
{:port, :inet.port_number()}
54+
| {:track_configs, [audio: rtp_track_config(), video: rtp_track_config()]}
55+
]
56+
2057
@type input ::
2158
(path_or_uri :: String.t())
2259
| {:mp4, location :: String.t(), transport: :file | :http}
2360
| {:webrtc, webrtc_signaling()}
2461
| {:rtmp, (uri :: String.t()) | (client_handler :: pid)}
2562
| {:rtsp, url :: String.t()}
63+
| {:rtp, in_rtp_opts()}
2664
| {:stream, in_stream_opts()}
2765

2866
@type output ::
@@ -149,6 +187,9 @@ defmodule Boombox do
149187
{:rtsp, location} when direction == :input and is_binary(location) ->
150188
value
151189

190+
{:rtp, opts} ->
191+
if Keyword.keyword?(opts), do: value
192+
152193
{:stream, opts} ->
153194
if Keyword.keyword?(opts), do: value
154195

lib/boombox/hls.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ defmodule Boombox.HLS do
4343
Enum.map(track_builders, fn
4444
{:audio, builder} ->
4545
builder
46-
|> child(:mp4_audio_transcoder, %Boombox.Transcoder{
46+
|> child(:hls_audio_transcoder, %Boombox.Transcoder{
4747
output_stream_format: Membrane.AAC
4848
})
4949
|> via_in(Pad.ref(:input, :audio),

lib/boombox/pipeline.ex

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ defmodule Boombox.Pipeline do
9393
track_builders: Boombox.Pipeline.track_builders() | nil,
9494
last_result: Boombox.Pipeline.Ready.t() | Boombox.Pipeline.Wait.t() | nil,
9595
eos_info: term(),
96-
rtsp_state: Boombox.RTSP.rtsp_state() | nil,
96+
rtsp_state: Boombox.RTSP.state() | nil,
9797
parent: pid(),
9898
output_webrtc_state: Boombox.WebRTC.output_webrtc_state() | nil
9999
}
@@ -316,6 +316,10 @@ defmodule Boombox.Pipeline do
316316
Boombox.RTSP.create_input(uri)
317317
end
318318

319+
defp create_input({:rtp, params}, _ctx, _state) do
320+
Boombox.RTP.create_input(params)
321+
end
322+
319323
defp create_input({:stream, params}, _ctx, state) do
320324
Boombox.ElixirStream.create_input(state.parent, params)
321325
end

lib/boombox/rtp.ex

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
defmodule Boombox.RTP do
2+
@moduledoc false
3+
import Membrane.ChildrenSpec
4+
5+
require Membrane.Pad
6+
7+
alias Boombox.Pipeline.Ready
8+
alias Membrane.RTP
9+
10+
@required_opts [:port, :track_configs]
11+
@required_encoding_specific_params %{
12+
AAC: [bitrate_mode: [require?: true], audio_specific_config: [require?: true]],
13+
OPUS: [],
14+
H264: [ppss: [require?: false], spss: [require?: false]],
15+
H265: [ppss: [require?: false], spss: [require?: false]]
16+
}
17+
18+
@type parsed_encoding_specific_params ::
19+
%{bitrate_mode: RTP.AAC.Utils.mode(), audio_specific_config: binary()}
20+
| %{optional(:ppss) => [binary()], optional(:spss) => [binary()]}
21+
| %{
22+
optional(:vpss) => [binary()],
23+
optional(:ppss) => [binary()],
24+
optional(:spss) => [binary()]
25+
}
26+
| %{}
27+
28+
@type parsed_track_config :: %{
29+
encoding_name: RTP.encoding_name(),
30+
encoding_specific_params: parsed_encoding_specific_params(),
31+
payload_type: RTP.payload_type(),
32+
clock_rate: RTP.clock_rate()
33+
}
34+
35+
@type parsed_in_opts :: %{
36+
port: :inet.port_number(),
37+
track_configs: %{audio: parsed_track_config(), video: parsed_track_config()}
38+
}
39+
40+
@spec create_input(Boombox.in_rtp_opts()) :: Ready.t()
41+
def create_input(opts) do
42+
parsed_options = validate_and_parse_options(opts)
43+
payload_type_mapping = get_payload_type_mapping(parsed_options.track_configs)
44+
45+
spec =
46+
child(:udp_source, %Membrane.UDP.Source{local_port_no: opts[:port]})
47+
|> child(:rtp_demuxer, %Membrane.RTP.Demuxer{payload_type_mapping: payload_type_mapping})
48+
49+
track_builders =
50+
Map.new(parsed_options.track_configs, fn {media_type, track_config} ->
51+
{depayloader, parser} =
52+
case track_config.encoding_name do
53+
:H264 ->
54+
ppss = Map.get(track_config.encoding_specific_params, :ppss, [])
55+
spss = Map.get(track_config.encoding_specific_params, :spss, [])
56+
{Membrane.RTP.H264.Depayloader, %Membrane.H264.Parser{ppss: ppss, spss: spss}}
57+
58+
:AAC ->
59+
audio_specific_config = track_config.encoding_specific_params.audio_specific_config
60+
bitrate_mode = track_config.encoding_specific_params.bitrate_mode
61+
62+
{%Membrane.RTP.AAC.Depayloader{mode: bitrate_mode},
63+
%Membrane.AAC.Parser{audio_specific_config: audio_specific_config}}
64+
65+
:OPUS ->
66+
{Membrane.RTP.Opus.Depayloader, Membrane.Opus.Parser}
67+
68+
:H265 ->
69+
vpss = Map.get(track_config.encoding_specific_params, :vpss, [])
70+
ppss = Map.get(track_config.encoding_specific_params, :ppss, [])
71+
spss = Map.get(track_config.encoding_specific_params, :spss, [])
72+
73+
{Membrane.RTP.H265.Depayloader,
74+
%Membrane.H265.Parser{vpss: vpss, ppss: ppss, spss: spss}}
75+
end
76+
77+
spec =
78+
get_child(:rtp_demuxer)
79+
|> via_out(:output, options: [stream_id: {:encoding_name, track_config.encoding_name}])
80+
|> child({:jitter_buffer, track_config.encoding_name}, %Membrane.RTP.JitterBuffer{
81+
clock_rate: track_config.clock_rate
82+
})
83+
|> child({:rtp_depayloader, track_config.encoding_name}, depayloader)
84+
|> child({:rtp_in_parser, track_config.encoding_name}, parser)
85+
86+
{media_type, spec}
87+
end)
88+
89+
%Ready{spec_builder: spec, track_builders: track_builders}
90+
end
91+
92+
@spec validate_and_parse_options(Boombox.in_rtp_opts()) :: parsed_in_opts()
93+
defp validate_and_parse_options(opts) do
94+
Enum.each(@required_opts, fn required_option ->
95+
unless Keyword.has_key?(opts, required_option) do
96+
raise "Required option #{inspect(required_option)} not present in passed RTP options"
97+
end
98+
end)
99+
100+
if opts[:track_configs] == [] do
101+
raise "No RTP media configured"
102+
end
103+
104+
parsed_track_configs =
105+
Map.new(opts[:track_configs], fn {media_type, track_config} ->
106+
{media_type, validate_and_parse_track_config!(track_config)}
107+
end)
108+
109+
%{port: opts[:port], track_configs: parsed_track_configs}
110+
end
111+
112+
@spec validate_and_parse_track_config!(Boombox.rtp_track_config()) :: parsed_track_config()
113+
defp validate_and_parse_track_config!(track_config) do
114+
{encoding_name, encoding_specific_params} =
115+
validate_and_parse_encoding!(track_config[:encoding])
116+
117+
track_config = Keyword.put(track_config, :encoding_name, encoding_name)
118+
119+
%{payload_type: payload_type, clock_rate: clock_rate} =
120+
RTP.PayloadFormat.resolve(track_config)
121+
122+
if payload_type == nil do
123+
raise "payload_type for encoding #{inspect(encoding_name)} not provided with no default value registered"
124+
end
125+
126+
if clock_rate == nil do
127+
raise "clock_rate for encoding #{inspect(encoding_name)} and payload_type #{inspect(payload_type)} not provided with no default value registered"
128+
end
129+
130+
%{
131+
encoding_name: encoding_name,
132+
encoding_specific_params: encoding_specific_params,
133+
payload_type: payload_type,
134+
clock_rate: clock_rate
135+
}
136+
end
137+
138+
@spec validate_and_parse_encoding!(RTP.encoding_name() | Boombox.rtp_encoding_specific_params()) ::
139+
{RTP.encoding_name(), %{}} | parsed_encoding_specific_params()
140+
defp validate_and_parse_encoding!(encoding) do
141+
case encoding do
142+
nil ->
143+
raise "Encoding name not provided"
144+
145+
encoding when is_atom(encoding) ->
146+
validate_and_parse_encoding!({encoding, []})
147+
148+
{encoding, encoding_params} when is_atom(encoding) ->
149+
field_specs = Map.get(@required_encoding_specific_params, encoding, [])
150+
{:ok, encoding_params} = Bunch.Config.parse(encoding_params, field_specs)
151+
{encoding, encoding_params}
152+
end
153+
end
154+
155+
@spec get_payload_type_mapping(%{audio: parsed_track_config(), video: parsed_track_config()}) ::
156+
RTP.PayloadFormat.payload_type_mapping()
157+
defp get_payload_type_mapping(track_configs) do
158+
Map.new(track_configs, fn {_media_type, track_config} ->
159+
{track_config.payload_type,
160+
%{encoding_name: track_config.encoding_name, clock_rate: track_config.clock_rate}}
161+
end)
162+
end
163+
end

lib/boombox/rtsp.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ defmodule Boombox.RTSP do
88
alias Membrane.{RTP, RTSP}
99
alias Boombox.Pipeline.{Ready, State, Wait}
1010

11-
@type rtsp_state :: %{
11+
@type state :: %{
1212
set_up_tracks: %{
1313
optional(:audio) => Membrane.RTSP.Source.track(),
1414
optional(:video) => Membrane.RTSP.Source.track()
@@ -43,7 +43,7 @@ defmodule Boombox.RTSP do
4343
{%Wait{}, %{state | rtsp_state: rtsp_state}}
4444
end
4545

46-
@spec handle_input_track(RTP.ssrc_t(), RTSP.Source.track(), State.t()) ::
46+
@spec handle_input_track(RTP.ssrc(), RTSP.Source.track(), State.t()) ::
4747
{Ready.t() | Wait.t(), State.t()}
4848
def handle_input_track(ssrc, track, state) do
4949
track_builders = state.rtsp_state.track_builders

mix.exs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ defmodule Boombox.Mixfile do
4646
defp deps do
4747
[
4848
{:membrane_core, "~> 1.1"},
49-
{:membrane_webrtc_plugin, "~> 0.23.0"},
49+
{:membrane_webrtc_plugin, "~> 0.23.1"},
5050
{:membrane_opus_plugin, "~> 0.20.3"},
5151
{:membrane_aac_plugin, "~> 0.19.0"},
5252
{:membrane_aac_fdk_plugin, "~> 0.18.0"},
@@ -58,12 +58,17 @@ defmodule Boombox.Mixfile do
5858
{:membrane_realtimer_plugin, "~> 0.9.0"},
5959
{:membrane_http_adaptive_stream_plugin, "~> 0.18.5"},
6060
{:membrane_rtmp_plugin, "~> 0.27.2"},
61-
{:membrane_rtsp_plugin, "~> 0.5.0"},
62-
{:membrane_rtp_plugin, "~> 0.29.0"},
61+
{:membrane_rtsp_plugin, "~> 0.6.1"},
62+
{:membrane_rtp_plugin, "~> 0.30.0"},
63+
{:membrane_rtp_format, "~> 0.10.0"},
64+
{:membrane_rtp_aac_plugin, "~> 0.9.0"},
65+
{:membrane_rtp_h264_plugin, "~> 0.20.0"},
66+
{:membrane_rtp_opus_plugin, "~> 0.10.0"},
67+
{:membrane_rtp_h265_plugin, "~> 0.5.2"},
6368
{:membrane_ffmpeg_swresample_plugin, "~> 0.20.0"},
6469
{:membrane_hackney_plugin, "~> 0.11.0"},
6570
{:membrane_ffmpeg_swscale_plugin, "~> 0.16.2"},
66-
{:membrane_simple_rtsp_server, "~> 0.1.0", only: :test},
71+
{:membrane_simple_rtsp_server, "~> 0.1.3"},
6772
{:image, "~> 0.54.0"},
6873
{:burrito, "~> 1.0", runtime: burrito?(), optional: true},
6974
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},

0 commit comments

Comments
 (0)