Skip to content

Commit bc3a6b9

Browse files
author
kidp330
committed
Update 08_Depayloader.md
1 parent 7045f9b commit bc3a6b9

File tree

1 file changed

+89
-68
lines changed

1 file changed

+89
-68
lines changed

basic_pipeline/08_Depayloader.md

+89-68
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
# Depeyloader
2-
1+
# Depayloader
32
Since we have [packets](../glossary/glossary.md#packet) put in order by the [Ordering Buffer](../basic_pipeline/06_OrderingBuffer.md), we can assemble them into the original [frames](../glossary/glossary.md#frame).
43
The Depayloader is an element responsible for this task. Specifically speaking, it unpacks the payload from the packets -
54
and that is why it's called 'depayloader'.
6-
Let's create a new module in the `lib/elements/Depayloader.ex` file:
5+
Let's create a new module in the `lib/elements/depayloader.ex` file:
76

8-
**_`lib/elements/Depayloader.ex`_**
7+
**_`lib/elements/depayloader.ex`_**
98

109
```elixir
1110
defmodule Basic.Elements.Depayloader do
@@ -18,138 +17,160 @@ end
1817

1918
What input data do we expect? Of course in `Basic.Format.Packet` format!
2019

21-
**_`lib/elements/Depayloader.ex`_**
20+
**_`lib/elements/depayloader.ex`_**
2221

2322
```elixir
2423
defmodule Basic.Elements.Depayloader do
25-
def_input_pad(:input, demand_unit: :buffers, caps: {Packet, type: :custom_packets})
24+
...
25+
def_input_pad :input,
26+
flow_control: :manual,
27+
demand_unit: :buffers,
28+
accepted_format: %Packet{type: :custom_packets}
2629
...
2730
end
2831
```
2932

3033
However, our element will process that input data in a way that will change the format - on output, there will be frames instead of packets!
3134
We need to specify it while defining the `:output` pad:
3235

33-
**_`lib/elements/Depayloader.ex`_**
36+
**_`lib/elements/depayloader.ex`_**
3437

3538
```elixir
3639
defmodule Basic.Elements.Depayloader do
3740
...
38-
def_output_pad(:output, caps: {Frame, encoding: :utf8})
41+
def_output_pad :output,
42+
flow_control: :manual,
43+
accepted_format: %Frame{encoding: :utf8}
3944
...
4045
end
4146
```
4247

4348
We will also need a parameter describing how many packets should we request once we receive a demand for a frame:
4449

45-
**_`lib/elements/Depayloader.ex`_**
50+
**_`lib/elements/depayloader.ex`_**
4651

4752
```elixir
4853
defmodule Basic.Elements.Depayloader do
4954
...
50-
def_options(
51-
packets_per_frame: [
52-
type: :integer,
53-
spec: pos_integer,
54-
description:
55-
"Positive integer, describing how many packets form a single frame. Used to demand the proper number of packets while assembling the frame."
56-
]
57-
)
55+
def_options packets_per_frame: [
56+
spec: pos_integer,
57+
description:
58+
"Positive integer, describing how many packets form a single frame. Used to demand the proper number of packets while assembling the frame."
59+
]
5860
...
5961
end
6062
```
6163

62-
In the `handle_init/1` callback we are simply saving the value of that parameter in the state of our element:
64+
In the `handle_init/2` callback we are simply saving the value of that parameter in the state of our element:
6365

64-
**_`lib/elements/Depayloader.ex`_**
66+
**_`lib/elements/depayloader.ex`_**
6567

6668
```elixir
67-
@impl true
68-
def handle_init(options) do
69-
{:ok,
70-
%{
71-
frame: [],
72-
packets_per_frame: options.packets_per_frame
73-
} }
69+
defmodule Basic.Elements.Depayloader do
70+
...
71+
@impl true
72+
def handle_init(_context, options) do
73+
{[],
74+
%{
75+
frame: [],
76+
packets_per_frame: options.packets_per_frame
77+
}}
78+
end
79+
...
7480
end
7581
```
7682

7783
Within the state, we will also hold a (potentially not complete) `:frame` - a list of packets, which form a particular frame. We will aggregate the packets in the `:frame` until the moment the frame is complete.
7884

79-
As noted in the [chapter dedicated to the caps](04_Caps.md), since we are changing the type of data within the element, we cannot rely on the default implementation of the `handle_caps/4` callback. We need to explicitly send the updated version of caps:
85+
As noted in the [chapter dedicated to stream formats](04_StreamFormat.md), since we are changing the type of data within the element, we cannot rely on the default implementation of the `handle_stream_format/4` callback. We need to explicitly send the updated version of the format:
8086

81-
**_`lib/elements/Depayloader.ex`_**
87+
**_`lib/elements/depayloader.ex`_**
8288

8389
```elixir
84-
@impl true
85-
def handle_caps(_pad, _caps, _context, state) do
86-
caps = %Frame{encoding: :utf8}
87-
{ {:ok, caps: {:output, caps} }, state}
90+
defmodule Basic.Elements.Depayloader do
91+
...
92+
@impl true
93+
def handle_stream_format(:input, _stream_format, _context, state) do
94+
{[stream_format: {:output, %Frame{encoding: :utf8}}], state}
95+
end
96+
...
8897
end
8998
```
9099

91-
As in most elements, the `handle_demand/5` implementation is quite easy - what we do is simply to make a demand on our `:input` pad once we receive a demand on the `:output` pad. However, since we are expected to produce a frame (which is formed from a particular number of packets) on the `:output` pad, we need to request a particular number of packets on the `:input` pad - that is why we have defined the `:packets_per_frame` option and now we will be making usage of it. In case we would have been asked to produce 10 frames, and each frame would have been made out of 5 packets, then we would need to ask for 10\*5 = 50 packets on the `:input`.
100+
As in most elements, the `handle_demand/5` implementation is quite easy - what we do is simply make a demand on our `:input` pad once we receive a demand on the `:output` pad. However, since we are expected to produce a frame (which is formed from a particular number of packets) on the `:output` pad, we need to request a particular number of packets on the `:input` pad - that is why we have defined the `:packets_per_frame` option and now we will be making use of it. In case we would have been asked to produce 10 frames, and each frame would have been made out of 5 packets, then we would need to ask for 10\*5 = 50 packets on the `:input`.
92101

93-
**_`lib/elements/Depayloader.ex`_**
102+
**_`lib/elements/depayloader.ex`_**
94103

95104
```elixir
96-
@impl true
97-
def handle_demand(_ref, size, _unit, _ctx, state) do
98-
{ {:ok, demand: {Pad.ref(:input), size * state.packets_per_frame} }, state}
105+
defmodule Basic.Elements.Depayloader do
106+
...
107+
@impl true
108+
def handle_demand(:output, size, :buffers, _context, state) do
109+
{[demand: {:input, size * state.packets_per_frame}], state}
110+
end
111+
...
99112
end
100113
```
101114

102-
There is nothing left apart from processing the input data - that is - the packets. Since the packets are coming in order, we can simply hold them in the `:frame` list until all the packets forming that frame will be there. As you might remember, each packet has a frame id in its header, which can be followed by a 'b' or 'e' character, indicating the type of the packet (the one begging a frame or the one ending the frame). We will use information about the type to find a moment in which we should produce a frame out of the packets list.
115+
There is nothing left apart from processing the input data - that is - the packets. Since the packets are coming in order, we can simply hold them in the `:frame` list until all the packets forming that frame will be there. As you might remember, each packet has a frame id in its header, which can be followed by a 'b' or 'e' character, indicating the type of the packet (the one beginning a frame or the one ending the frame). We will use information about the type to find a moment in which we should produce a frame out of the packets list.
103116

104-
**_`lib/elements/Depayloader.ex`_**
117+
**_`lib/elements/depayloader.ex`_**
105118

106119
```elixir
107-
@impl true
108-
def handle_process(_ref, buffer, _ctx, state) do
109-
packet = buffer.payload
120+
defmodule Basic.Elements.Depayloader do
121+
...
122+
@impl true
123+
def handle_buffer(:input, buffer, _context, state) do
124+
packet = buffer.payload
110125

111-
regex =
112-
~r/^\[frameid\:(?<frame_id>\d+(?<type>[s|e]*))\]\[timestamp\:(?<timestamp>\d+)\](?<data>.*)$/
126+
regex =
127+
~r/^\[frameid\:(?<frame_id>\d+(?<type>[s|e]*))\]\[timestamp\:(?<timestamp>\d+)\](?<data>.*)$/
113128

114-
%{"data" => data, "frame_id" => _frame_id, "type" => type, "timestamp" => timestamp} =
115-
Regex.named_captures(regex, packet)
129+
%{"data" => data, "frame_id" => _frame_id, "type" => type, "timestamp" => timestamp} =
130+
Regex.named_captures(regex, packet)
116131

117-
frame = [data | state.frame]
132+
frame = [data | state.frame]
118133
...
119134
end
120135
```
121136

122137
Once again we are taking advantage of the `Regex.named_captures`.
123138
Once we fetch the interesting values of the header's parameters, we can update the `:frame`.
124139

125-
**_`lib/elements/Depayloader.ex`_**
140+
**_`lib/elements/depayloader.ex`_**
126141

127142
```elixir
128-
@impl true
129-
def handle_process(_ref, buffer, _ctx, state) do
130-
...
131-
case type do
132-
"e" ->
133-
frame = prepare_frame(frame)
134-
state = Map.put(state, :frame, [])
135-
buffer = %Membrane.Buffer{payload: frame, pts: String.to_integer(timestamp)}
136-
{ {:ok, [buffer: {:output, buffer}]}, state}
137-
138-
_ ->
139-
state = Map.put(state, :frame, frame)
140-
{:ok, state}
141-
end
143+
defmodule Basic.Elements.Depayloader do
144+
...
145+
@impl true
146+
def handle_buffer(:input, buffer, _context, state) do
147+
...
148+
if type == "e" do
149+
buffer = %Membrane.Buffer{
150+
payload: prepare_frame(frame),
151+
pts: String.to_integer(timestamp)
152+
}
153+
154+
{[buffer: {:output, buffer}], %{state | frame: []}}
155+
else
156+
{[], %{state | frame: frame}}
157+
end
158+
...
142159
end
143160
```
144161

145162
Now, depending on the type of frame, we perform different actions.
146-
If we have the 'ending' packet, we are making the `:buffer` action with the frame made out of the packets (that's where `prepare_frame/1` function comes in handy), and clear the `:frame` buffer. Here is how can the `prepare_frame/1` function be implemented:
163+
If we have the 'ending' packet, we are making the `:buffer` action with the frame made out of the packets (that's where `prepare_frame/1` function comes in handy), and clear the `:frame` buffer. Here is how the `prepare_frame/1` function can be implemented:
147164

148-
**_`lib/elements/Depayloader.ex`_**
165+
**_`lib/elements/depayloader.ex`_**
149166

150167
```elixir
151-
defp prepare_frame(frame) do
152-
frame |> Enum.reverse() |> Enum.join("")
168+
defmodule Basic.Elements.Depayloader do
169+
...
170+
defp prepare_frame(frame) do
171+
frame |> Enum.reverse() |> Enum.join("")
172+
end
173+
...
153174
end
154175
```
155176

@@ -161,5 +182,5 @@ Test the `Depayloader`:
161182
mix test test/elements/depayloader_test.exs
162183
```
163184

164-
With the [`Source`](../glossary/glossary.md#source), [`OrderingBuffer`](../glossary/glossary.md#jitter-buffer--ordering-buffer) and [`Depayloader`](../glossary/glossary.md#payloader-and-depayloader) elements ready we are able to read packets from file, order them based on their sequence id and assemble them back into frames.
165-
In the next chapter we will be dealing with [`Mixer`](../glossary/glossary.md#mixer) which will merge two message streams in order to create complete conversation.
185+
With the [`Source`](../glossary/glossary.md#source), [`OrderingBuffer`](../glossary/glossary.md#jitter-buffer--ordering-buffer) and [`Depayloader`](../glossary/glossary.md#payloader-and-depayloader) elements ready we are able to read packets from file, order them based on their sequence ID and assemble them back into frames.
186+
In the next chapter we will be dealing with the [`Mixer`](../glossary/glossary.md#mixer) which will merge two message streams in order to create complete conversation.

0 commit comments

Comments
 (0)