Skip to content

Commit d6980dc

Browse files
committed
Restructure 09_Mixer.md
1 parent f66c2e4 commit d6980dc

File tree

1 file changed

+37
-30
lines changed

1 file changed

+37
-30
lines changed

basic_pipeline/09_Mixer.md

+37-30
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ defmodule Basic.Elements.Mixer do
4949
end
5050
```
5151

52-
As you can see in the code snippet above, the `Track` will consist of the `:buffer` field, holding the very last buffer received on the corresponding input pad, and the `:status` fields, indicating the status of the track - `:started`, in case we are still expecting some buffers to come (that means - in case `:end_of_stream` event hasn't been received yet) and `:finished` otherwise.
53-
It's a good practice to provide a type specification for such a custom struct since it makes the code easier to reuse and lets the compiler warn us about some misspellings (for instance in the status field atoms), which cause some hard to spot errors.
52+
As you can see in the code snippet above, the `Track` will consist of the `:buffer` field, holding the very last buffer received on the corresponding input pad, and the `:status` fields, indicating the status of the track - `:started`, in case we are still expecting some buffers to come (that means - in case `:end_of_stream` event hasn't been received yet) and `:finished` otherwise.
53+
54+
It's a good practice to provide a type specification for such a custom struct since it makes the code easier to reuse and lets the compiler warn us about some misspellings (for instance in the status field atoms), which cause some hard to spot errors.
55+
5456
A careful reader might notice, that we are holding only one buffer for each track, instead of a list of all the potentially unprocessed buffers - does it mean that we are losing some of them? Not at all, since we are taking advantage of the elements which have appeared earlier in the [pipeline](../glossary/glossary.md#pipeline) and which provide us with an ordered list of frames on each of the inputs - however, we will need to process each buffer just at the moment it comes on the pad.
5557

5658
The logic we're going to implement can be described in the following three steps:
@@ -79,7 +81,9 @@ end
7981

8082
We have provided a `handle_init/2` callback, which does not expect any options to be passed. We are simply setting up the structure of the element state.
8183
As mentioned previously, we will have a `Track` structure for each of the input pads.
82-
What's interesting is this is where the mixer having exactly two inputs stops being important. The missing functionality can be defined generically without much hassle.
84+
85+
What's interesting is this is where the mixer having exactly two inputs stops being important. The missing functionality can be defined generically without much hassle.
86+
8387
Following on the callbacks implementation, let's continue with the `handle_buffer/4` implementation:
8488

8589
**_`lib/elements/mixer.ex`_**
@@ -100,6 +104,7 @@ end
100104
```
101105

102106
In this callback we update the mixer's state by assigning the incoming buffer to its track. We can be sure no overwriting of an existing buffer happens, which will become more apparent as we delve further into the logic's implementation.
107+
103108
Once the state is updated we gather all buffers that can be sent (might be none) in `get_output_buffers_actions/1` and return the coresponding `buffer` actions. In case any demands should be sent afterwards we also tell the output pad to redemand.
104109

105110
**_`lib/elements/mixer.ex`_**
@@ -125,34 +130,9 @@ defmodule Basic.Elements.Mixer do
125130
end
126131
```
127132

128-
What we did here was similar to the logic defined in `handle_buffer/4` - we have just updated the state of the track (in that case - by setting its status as `:finished`), gather the buffers and send them. The important difference is that in case all inputs have closed, we should forward an `end_of_stream` action instead of a `redemand`, signaling the mixer has finished its processing.
129-
The `has_buffer?/1` function is a private utility that will show up in a few more places in our element's definition and is nothing more than a simple check `track.buffer != nil`.
130-
Let's now implement the `handle_demand/5` callback:
131-
132-
**_`lib/elements/mixer.ex`_**
133-
134-
```elixir
135-
defmodule Basic.Elements.Mixer do
136-
...
137-
def handle_demand(:output, _size, _unit, context, state) do
138-
demand_actions =
139-
state.tracks
140-
|> Enum.reject(&has_buffer?/1)
141-
|> Enum.filter(fn {track_id, track} ->
142-
track.status != :finished and context.pads[track_id].demand == 0
143-
end)
144-
|> Enum.map(fn {track_id, _track} -> {:demand, {track_id, 1}} end)
145-
146-
{demand_actions, state}
147-
end
148-
...
149-
end
150-
```
151-
152-
Since it should be responsible for producing and sending `demand` actions to the corresponding input pads, we accordingly filter tracks for ones that are empty, started, and with no demands pending.
153-
It should also become clearer why in `handle_buffer/4` the receiving track is sure to have an empty buffer ready to be overwritten, since we only send demands to input pads of empty tracks.
154-
All that's left now is to implement the main processing logic, gathering buffers that are ready to be sent:
133+
What we did here was similar to the logic defined in `handle_buffer/4` - we have just updated the state of the track (in that case - by setting its status to `:finished`), gather the buffers and send them. The important difference is that in case all inputs have closed, we should forward an `end_of_stream` action instead of a `redemand`, signaling the mixer has finished its processing.
155134

135+
Let's now implement gathering ready buffers:
156136

157137
**_`lib/elements/mixer.ex`_**
158138

@@ -197,9 +177,36 @@ end
197177
```
198178

199179
The `prepare_buffers/1` function is the most involved here, so let's start with that. We first check whether we can send a buffer at all. The next buffer to send in order will of course be one with lowest `.pts`. We then empty the corresponding track's buffer. There might be more than one buffer ready to send and so we iterate the gathering recursively.
180+
200181
We define `can_send_buffer?` as follows. If there's any `:started` track still waiting on a buffer we cannot send more, since whatever buffers the mixer's currently holding might come after the one that's yet to be received on this track.
182+
201183
Otherwise, if all tracks have finished it can still be the case that some have non-empty buffers. We can happily send all of these in order since it is guaranteed there is no buffer preceding them that we would have to wait for.
202184

185+
All that's left now is to handle redemands.
186+
187+
**_`lib/elements/mixer.ex`_**
188+
189+
```elixir
190+
defmodule Basic.Elements.Mixer do
191+
...
192+
def handle_demand(:output, _size, _unit, context, state) do
193+
demand_actions =
194+
state.tracks
195+
|> Enum.reject(&has_buffer?/1)
196+
|> Enum.filter(fn {track_id, track} ->
197+
track.status != :finished and context.pads[track_id].demand == 0
198+
end)
199+
|> Enum.map(fn {track_id, _track} -> {:demand, {track_id, 1}} end)
200+
201+
{demand_actions, state}
202+
end
203+
...
204+
end
205+
```
206+
207+
Since it should be responsible for producing and sending `demand` actions to the corresponding input pads, we accordingly filter tracks for ones that are empty, started, and with no demands pending.
208+
It should also become clearer why in `handle_buffer/4` the receiving track is sure to have an empty buffer ready to be overwritten, since we only send demands to input pads of empty tracks.
209+
203210
And that's all! the mixer's ready to mix, and ready to be tested:
204211

205212
```console

0 commit comments

Comments
 (0)