Skip to content

Commit 35a2fde

Browse files
authored
[Recognizer] Use jitter buffer (#58)
1 parent 0e34297 commit 35a2fde

File tree

3 files changed

+47
-24
lines changed

3 files changed

+47
-24
lines changed

recognizer/lib/recognizer/room.ex

+41-19
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,11 @@ defmodule Recognizer.Room do
66
require Logger
77

88
alias ExWebRTC.{ICECandidate, PeerConnection, RTPCodecParameters, SessionDescription}
9-
alias ExWebRTC.RTP.Depayloader
9+
alias ExWebRTC.RTP.{Depayloader, JitterBuffer}
1010

1111
@max_session_time_s Application.compile_env!(:recognizer, :max_session_time_s)
1212
@session_time_timer_interval_ms 1_000
13+
@jitter_buffer_latency_ms 50
1314

1415
@video_codecs [
1516
%RTPCodecParameters{
@@ -53,6 +54,7 @@ defmodule Recognizer.Room do
5354
video_track: nil,
5455
video_depayloader: video_depayloader,
5556
video_decoder: Xav.Decoder.new(:vp8),
57+
video_buffer: JitterBuffer.new(latency: @jitter_buffer_latency_ms),
5658
audio_track: nil,
5759
session_start_time: System.monotonic_time(:millisecond)
5860
}}
@@ -133,24 +135,9 @@ defmodule Recognizer.Room do
133135
{:ex_webrtc, _pc, {:rtp, track_id, nil, packet}},
134136
%{video_track: %{id: track_id}} = state
135137
) do
136-
{frame, depayloader} = Depayloader.depayload(state.video_depayloader, packet)
137-
state = %{state | video_depayloader: depayloader}
138-
139-
with true <- is_nil(state.task),
140-
false <- is_nil(frame),
141-
{:ok, frame} <- Xav.Decoder.decode(state.video_decoder, frame) do
142-
tensor = Xav.Frame.to_nx(frame)
143-
task = Task.async(fn -> Nx.Serving.batched_run(Recognizer.VideoServing, tensor) end)
144-
state = %{state | task: task}
145-
{:noreply, state}
146-
else
147-
other when other in [:ok, true, false] ->
148-
{:noreply, state}
149-
150-
{:error, :no_keyframe} ->
151-
Logger.warning("Couldn't decode video frame - missing keyframe!")
152-
{:noreply, state}
153-
end
138+
state.video_buffer
139+
|> JitterBuffer.insert(packet)
140+
|> handle_jitter_buffer_result(state)
154141
end
155142

156143
@impl true
@@ -162,6 +149,13 @@ defmodule Recognizer.Room do
162149
{:noreply, state}
163150
end
164151

152+
@impl true
153+
def handle_info(:jitter_buffer_timer, state) do
154+
state.video_buffer
155+
|> JitterBuffer.handle_timeout()
156+
|> handle_jitter_buffer_result(state)
157+
end
158+
165159
@impl true
166160
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
167161
if pid != state.channel do
@@ -207,4 +201,32 @@ defmodule Recognizer.Room do
207201
def handle_info(_msg, state) do
208202
{:noreply, state}
209203
end
204+
205+
defp handle_jitter_buffer_result({packets, timer, buffer}, state) do
206+
unless is_nil(timer), do: Process.send_after(self(), :jitter_buffer_timer, timer)
207+
208+
state = Enum.reduce(packets, state, fn packet, state -> handle_packet(packet, state) end)
209+
210+
{:noreply, %{state | video_buffer: buffer}}
211+
end
212+
213+
defp handle_packet(packet, state) do
214+
{frame, depayloader} = Depayloader.depayload(state.video_depayloader, packet)
215+
state = %{state | video_depayloader: depayloader}
216+
217+
with true <- is_nil(state.task),
218+
false <- is_nil(frame),
219+
{:ok, frame} <- Xav.Decoder.decode(state.video_decoder, frame) do
220+
tensor = Xav.Frame.to_nx(frame)
221+
task = Task.async(fn -> Nx.Serving.batched_run(Recognizer.VideoServing, tensor) end)
222+
%{state | task: task}
223+
else
224+
other when other in [:ok, true, false] ->
225+
state
226+
227+
{:error, :no_keyframe} ->
228+
Logger.warning("Couldn't decode video frame - missing keyframe!")
229+
state
230+
end
231+
end
210232
end

recognizer/mix.exs

+2-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ defmodule Recognizer.MixProject do
5858
{:jason, "~> 1.2"},
5959
{:dns_cluster, "~> 0.1.1"},
6060
{:plug_cowboy, "~> 2.5"},
61-
{:ex_webrtc, "~> 0.5.0"},
61+
# {:ex_webrtc, "~> 0.5.0"},
62+
{:ex_webrtc, github: "elixir-webrtc/ex_webrtc", override: true},
6263
{:ex_webrtc_dashboard, "~> 0.5.0"},
6364
{:xav, "~> 0.5.0"},
6465
{:bumblebee, "~> 0.5.3"},

recognizer/mix.lock

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
2121
"esbuild": {:hex, :esbuild, "0.8.1", "0cbf919f0eccb136d2eeef0df49c4acf55336de864e63594adcea3814f3edf41", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "25fc876a67c13cb0a776e7b5d7974851556baeda2085296c14ab48555ea7560f"},
2222
"ex_dtls": {:hex, :ex_dtls, "0.16.0", "3ae38025ccc77f6db573e2e391602fa9bbc02253c137d8d2d59469a66cbe806b", [:mix], [{:bundlex, "~> 1.5.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2a4e30d74c6ddf95cc5b796423293c06a0da295454c3823819808ff031b4b361"},
23-
"ex_ice": {:hex, :ex_ice, "0.8.1", "4d5c911766ce92e13323b632a55d9ab821092f13fc2ebf236dc233c8c1f9a64c", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "8f10134e2eb7e6aebbf8fba0d5fcec56d8f8db3e94c3dde045feb463979c2dda"},
23+
"ex_ice": {:hex, :ex_ice, "0.8.2", "83abcde0d5e7ec613d91908900a4b25af11116e1a1377ed0cfbf75f254e95ac3", [:mix], [{:elixir_uuid, "~> 1.0", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}, {:ex_turn, "~> 0.1.0", [hex: :ex_turn, repo: "hexpm", optional: false]}], "hexpm", "da698387337f3f199d4cae8e3c52aaf2222597b2a482e1fafac07dfc7e0c5aed"},
2424
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
2525
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
2626
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
27-
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
27+
"ex_sdp": {:hex, :ex_sdp, "1.0.1", "1608551d740a1882fe89d2b1df807167de62c44ab5409a795f257069e348ac05", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "0b8c53b15f18122feed9b65c1318603bebbe33cbad36efb3995b6e03b1bf27ee"},
2828
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
2929
"ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},
30-
"ex_webrtc": {:hex, :ex_webrtc, "0.5.0", "18bb3f5de329edd4410979c2882c971b3c310e33769405a6d160c8b5210ac371", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.16.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:ex_ice, "~> 0.8.0", [hex: :ex_ice, repo: "hexpm", optional: false]}, {:ex_libsrtp, "~> 0.7.1", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_rtcp, "~> 0.4.0", [hex: :ex_rtcp, repo: "hexpm", optional: false]}, {:ex_rtp, "~> 0.4.0", [hex: :ex_rtp, repo: "hexpm", optional: false]}, {:ex_sctp, "0.1.0", [hex: :ex_sctp, repo: "hexpm", optional: true]}, {:ex_sdp, "~> 1.0", [hex: :ex_sdp, repo: "hexpm", optional: false]}], "hexpm", "6a701e7925cc627c24faee3fbd362a5b9c5d86262ec72980e2af847e1ad8ec98"},
30+
"ex_webrtc": {:git, "https://github.com/elixir-webrtc/ex_webrtc.git", "e42c09df1ae702126dc407ee35a5760d0f601967", []},
3131
"ex_webrtc_dashboard": {:hex, :ex_webrtc_dashboard, "0.5.0", "05840125fcac9f708318115a5101fa58bd936735dc6f5900fb230af293305df8", [:mix], [{:ex_webrtc, "~> 0.5.0", [hex: :ex_webrtc, repo: "hexpm", optional: false]}, {:phoenix_live_dashboard, "~> 0.8.3", [hex: :phoenix_live_dashboard, repo: "hexpm", optional: false]}], "hexpm", "b0fcef6c23df888f0f094e7f63541e980db175e27b551f2f314e277ac3eb2f29"},
3232
"exla": {:hex, :exla, "0.7.3", "51310270a0976974fc758f7b28ebd6ca8e099b3d6fc78b0d484c808e977cb914", [:make, :mix], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:nx, "~> 0.7.1", [hex: :nx, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:xla, "~> 0.6.0", [hex: :xla, repo: "hexpm", optional: false]}], "hexpm", "5b3d5741a24aada21d3b0feb4b99d1fc3c8457f995a63ea16684d8d5678b96ff"},
3333
"file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"},
@@ -59,7 +59,7 @@
5959
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
6060
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
6161
"req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"},
62-
"rustler_precompiled": {:hex, :rustler_precompiled, "0.8.0", "02d218b575d8175e80138557f46bee7af5598f29e9aff8935a6c369c0e6c47a5", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "00b1711d8d828200fe931e23bb0e72c2672a3a0ef76740e3c50433afda1965fb"},
62+
"rustler_precompiled": {:hex, :rustler_precompiled, "0.8.1", "8afe0b6f3a9a677ada046cdd23e3f4c6399618b91a6122289324774961281e1e", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "90b8c2297bf7959cfa1c927b2881faad7bb0707183124955369991b76177a166"},
6363
"safetensors": {:hex, :safetensors, "0.1.3", "7ff3c22391e213289c713898481d492c9c28a49ab1d0705b72630fb8360426b2", [:mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:nx, "~> 0.5", [hex: :nx, repo: "hexpm", optional: false]}], "hexpm", "fe50b53ea59fde4e723dd1a2e31cfdc6013e69343afac84c6be86d6d7c562c14"},
6464
"shmex": {:hex, :shmex, "0.5.1", "81dd209093416bf6608e66882cb7e676089307448a1afd4fc906c1f7e5b94cf4", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "c29f8286891252f64c4e1dac40b217d960f7d58def597c4e606ff8fbe71ceb80"},
6565
"tailwind": {:hex, :tailwind, "0.2.3", "277f08145d407de49650d0a4685dc062174bdd1ae7731c5f1da86163a24dfcdb", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}], "hexpm", "8e45e7a34a676a7747d04f7913a96c770c85e6be810a1d7f91e713d3a3655b5d"},

0 commit comments

Comments
 (0)