Skip to content

Commit 7045f9b

Browse files
author
kidp330
committed
Update 06_OrderingBuffer.md
1 parent 3f71657 commit 7045f9b

File tree

1 file changed

+62
-52
lines changed

1 file changed

+62
-52
lines changed

basic_pipeline/06_OrderingBuffer.md

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,84 +4,88 @@ In this chapter we will deal with the next [element](../glossary/glossary.md#ele
44
As stated in the [chapter about the system architecture](02_SystemArchitecture.md), this element is responsible for ordering the incoming [packets](../glossary/glossary.md#packet), based on their sequence id.
55
Because Ordering Buffer is a filtering element, we need to specify both the input and the output [pads](../glossary/glossary.md#pad):
66

7-
**_`lib/elements/OrderingBuffer.ex`_**
7+
**_`lib/elements/ordering_buffer.ex`_**
88

99
```elixir
1010
defmodule Basic.Elements.OrderingBuffer do
1111
use Membrane.Filter
1212
alias Basic.Formats.Packet
1313

14-
def_input_pad(:input, demand_unit: :buffers, caps: {Packet, type: :custom_packets})
14+
def_input_pad :input,
15+
flow_control: :manual,
16+
demand_unit: :buffers,
17+
accepted_format: %Packet{type: :custom_packets}
1518

16-
def_output_pad(:output, caps: {Packet, type: :custom_packets})
17-
...
19+
def_output_pad :output,
20+
flow_control: :manual,
21+
accepted_format: %Packet{type: :custom_packets}
22+
...
1823
end
1924
```
2025

21-
Note the caps specification definition there - we expect `Basic.Formats.Packet` of type `:custom_packets` to be sent on the input pad, and the same type of packets to be sent through the output pad.
26+
Note the format specification definition there - we expect `Basic.Formats.Packet` of type `:custom_packets` to be sent on the input pad, and the same type of packets to be sent through the output pad.
2227
In the next step let's specify how we want the state of our element to look like:
2328

24-
**_`lib/elements/OrderingBuffer.ex`_**
29+
**_`lib/elements/ordering_buffer.ex`_**
2530

2631
```elixir
2732
defmodule Basic.Elements.OrderingBuffer do
28-
...
33+
...
2934
@impl true
30-
def handle_init(_options) do
31-
{:ok,
32-
%{
33-
ordered_packets: [],
34-
last_sent_seq_id: 0
35-
}
36-
}
35+
def handle_init(_context, _options) do
36+
{[],
37+
%{
38+
ordered_packets: [],
39+
last_sent_seq_id: 0
40+
}}
3741
end
38-
...
42+
...
3943
end
4044
```
4145

4246
If you don't remember what is the purpose of the Ordering Buffer, please refer to the [2nd chapter](02_SystemArchitecture.md).
4347
We will need to hold a list of ordered packets, as well as a sequence id of the packet, which most recently was sent through the output pad (we need to know if there are some packets missing between the last sent packet and the first packet in our ordered list).
4448

45-
Handling demand is quite straightforward:
49+
Handling demands is quite straightforward:
4650

47-
**_`lib/elements/OrderingBuffer.ex`_**
51+
**_`lib/elements/ordering_buffer.ex`_**
4852

4953
```elixir
5054
defmodule Basic.Elements.OrderingBuffer do
5155
...
52-
@impl true
53-
def handle_demand(_ref, size, _unit, _ctx, state) do
54-
{ {:ok, demand: {Pad.ref(:input), size} }, state}
55-
end
56+
@impl true
57+
def handle_demand(:output, size, _unit, _context, state) do
58+
{[demand: {:input, size}], state}
59+
end
5660
...
5761
end
5862
```
5963

6064
We simply send the `:demand` on the `:input` pad once we receive a demand on the `:output` pad. One packet on input corresponds to one packet on output so for each 1 unit of demand we send 1 unit of demand to the `:input` pad.
6165

62-
Now we can go to the main part of the Ordering Buffer implementation - the `handle_process/4` callback.
66+
Now we can go to the main part of the Ordering Buffer implementation - the `handle_buffer/4` callback.
6367
The purpose of this callback is to process the incoming buffer. It gets called once a new buffer is available and waiting to be processed.
6468

65-
**_`lib/elements/OrderingBuffer.ex`_**
69+
**_`lib/elements/ordering_buffer.ex`_**
6670

6771
```elixir
6872
defmodule Basic.Elements.OrderingBuffer do
69-
...
73+
...
7074
@impl true
71-
def handle_process(:input, buffer, _context, state) do
75+
def handle_buffer(:input, buffer, _context, state) do
7276
packet = unzip_packet(buffer.payload)
7377
ordered_packets = [packet | state.ordered_packets] |> Enum.sort()
7478
state = %{state | ordered_packets: ordered_packets}
75-
{last_seq_id, _} = Enum.at(ordered_packets, 0)
76-
...
79+
[{last_seq_id, _} | _] = ordered_packets
80+
...
7781
end
7882

7983
defp unzip_packet(packet) do
8084
regex = ~r/^\[seq\:(?<seq_id>\d+)\](?<data>.*)$/
8185
%{"data" => data, "seq_id" => seq_id} = Regex.named_captures(regex, packet)
8286
{String.to_integer(seq_id), %Membrane.Buffer{payload: data} }
8387
end
84-
...
88+
...
8589
end
8690
```
8791

@@ -118,64 +122,70 @@ The result of `Regex.named_captures/2` applied to that regex description and the
118122
{"seq_id"=>7, "data"=>"[frameid:2][timestamp:3]data"}
119123
```
120124

121-
Once we unzip the header of the packet in the `handle_process/4` callback, we can put the incoming packet in the `ordered_packets` list and sort that list. Due to the fact, that elements of this list are tuples, whose first element is a sequence id (a value that is unique), the list will be sorted based on the sequence id.
125+
Once we unzip the header of the packet in the `handle_buffer/4` callback, we can put the incoming packet in the `ordered_packets` list and sort that list. Due to the fact, that elements of this list are tuples, whose first element is a sequence id (a value that is unique), the list will be sorted based on the sequence id.
122126
We also get the sequence id of the first element in the updated `ordered_packets` list.
123127

124-
Here comes the rest of the `handle_process/4` definition:
128+
Here comes the rest of the `handle_buffer/4` definition:
125129

126-
**_`lib/elements/OrderingBuffer.ex`_**
130+
**_`lib/elements/ordering_buffer.ex`_**
127131

128132
```elixir
129133
defmodule Basic.Elements.OrderingBuffer do
130-
...
131-
def handle_process(:input, buffer, _context, state) do
132-
...
134+
...
135+
def handle_buffer(:input, buffer, _context, state) do
136+
...
133137
if state.last_sent_seq_id + 1 == last_seq_id do
134-
{reversed_ready_packets_sequence, ordered_packets} = get_ready_packets_sequence(ordered_packets, [])
135-
[{last_sent_seq_id, _} | _] = reversed_ready_packets_sequence
138+
{ready_packets_sequence, ordered_packets_left} =
139+
get_ready_packets_sequence(ordered_packets, [])
140+
141+
{last_sent_seq_id, _} = List.last(ready_packets_sequence)
136142

137143
state = %{
138144
state
139-
| ordered_packets: ordered_packets,
145+
| ordered_packets: ordered_packets_left,
140146
last_sent_seq_id: last_sent_seq_id
141147
}
142-
buffers = Enum.reverse(reversed_ready_packets_sequence) |> Enum.map(fn {_seq_id, data} -> data end)
143148

144-
{ {:ok, buffer: {:output, buffers} }, state}
149+
ready_buffers = Enum.map(ready_packets_sequence, &elem(&1, 1))
150+
151+
{[buffer: {:output, ready_buffers}], state}
145152
else
146-
{ {:ok, redemand: :output}, state}
153+
{[redemand: :output], state}
147154
end
148155
end
149-
...
156+
...
150157
end
151158
```
152159

153160
We need to distinguish between two situations: the currently processed packet can have a sequence id which is subsequent to the sequence id of the last sent packet or there might be some packets not yet delivered to us, with sequence ids in between the last sent sequence id and the sequence id of a currently processed packet. In the second case, we should store the packet and wait for the next packets to arrive. We will accomplish that using [`redemands` mechanism](../glossary/glossary.md#redemands), which will be explained in detail in the next chapter.
154161
However, in the first situation, we need to get the ready packet's sequence - that means, a consistent batch of packets from the `:ordered_packets`. This can be done in the following way:
155162

156-
**_`lib/elements/OrderingBuffer.ex`_**
163+
**_`lib/elements/ordering_buffer.ex`_**
157164

158165
```elixir
159166
defmodule Basic.Elements.OrderingBuffer do
160-
...
161-
defp get_ready_packets_sequence([], acc) do
162-
{acc, []}
167+
...
168+
defp get_ready_packets_sequence([first_packet | ordered_rest], []) do
169+
get_ready_packets_sequence(ordered_rest, [first_packet])
163170
end
164171

165172
defp get_ready_packets_sequence(
166-
[{first_id, _first_data} = first_seq | [{second_id, second_data} | rest]], acc)
167-
when first_id + 1 == second_id do
168-
get_ready_packets_sequence([{second_id, second_data} | rest], [first_seq | acc])
173+
[next_seq = {next_id, _} | ordered_rest],
174+
[{last_id, _} | _] = ready_sequence
175+
)
176+
when next_id == last_id + 1 do
177+
get_ready_packets_sequence(ordered_rest, [next_seq | ready_sequence])
169178
end
170179

171-
defp get_ready_packets_sequence([first_seq | rest], acc) do
172-
{[first_seq | acc], rest}
173-
end
180+
defp get_ready_packets_sequence(ordered_packets, ready_sequence) do
181+
{Enum.reverse(ready_sequence), ordered_packets}
182+
end
183+
...
174184
end
175185
```
176186

177187
Note the order of the definitions, since we are taking advantage of the pattern matching mechanism!
178-
The algorithm implemented in the snippet above is really simple - we are recursively taking the next packet out of the `:ordered_packets` buffer until it becomes empty or there is a missing packet (`first_id + 1 == second_id`) between the last taken packet and the next packet in the buffer.
188+
The algorithm implemented in the snippet above is really simple - we are recursively taking the next packet out of the `:ordered_packets` buffer until it becomes empty or there is a missing packet (`next_id == last_id + 1`) between the last taken packet and the next packet in the buffer.
179189
Once we have a consistent batch of packets, we can update the state (both the`:ordered_packets` and the `:last_sent_seq_id` need to be updated) and output the ready packets by defining the `:buffer` action.
180190

181191
Test the `OrderingBuffer`:

0 commit comments

Comments
 (0)