Skip to content

Commit 8b8c77e

Browse files
committed
Fix bugs in handling opus in agora sink
1 parent c4dd318 commit 8b8c77e

File tree

10 files changed

+96
-78
lines changed

10 files changed

+96
-78
lines changed

c_src/membrane_agora_plugin/sink.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
2424
scfg.enableVideo = true;
2525
scfg.useStringUid = false;
2626
if (state->service->initialize(scfg) != agora::ERR_OK) {
27-
AG_LOG(ERROR, "Failed to initialize service");
27+
AG_LOG(ERROR, "Failed to initialize sink service");
2828
unifex_release_state(env, state);
2929
return create_result_error(env, "Failed to initialize service");
3030
}
@@ -159,17 +159,10 @@ UNIFEX_TERM write_audio_data(UnifexEnv *env, UnifexPayload *payload,
159159

160160
if (codec == CODEC_AUDIO_AAC) {
161161
audioFrameInfo.codec = agora::rtc::AUDIO_CODEC_TYPE::AUDIO_CODEC_AACLC;
162-
printf("\nDUPA NIF AAC\n\n");
163-
fflush(stdout);
164162
} else if (codec == CODEC_AUDIO_OPUS) {
165163
audioFrameInfo.codec = agora::rtc::AUDIO_CODEC_TYPE::AUDIO_CODEC_OPUS;
166-
printf("\nDUPA NIF OPUS\n\n");
167-
fflush(stdout);
168164
}
169165

170-
printf("\nDUPA NIF AFTER\n\n");
171-
fflush(stdout);
172-
173166
if (state->audioEncodedFrameSender->sendEncodedAudioFrame(
174167
reinterpret_cast<uint8_t *>(payload->data), payload->size,
175168
audioFrameInfo) != true) {

c_src/membrane_agora_plugin/source.cpp

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
1919
scfg.enableVideo = true;
2020
scfg.useStringUid = false;
2121
if (state->service->initialize(scfg) != agora::ERR_OK) {
22-
AG_LOG(ERROR, "Failed to initialize service");
22+
AG_LOG(ERROR, "Failed to initialize source service");
23+
state->service = NULL;
2324
unifex_release_state(env, state);
2425
return create_result_error(env, "Failed to initialize service");
2526
}
@@ -77,21 +78,32 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
7778
}
7879

7980
void handle_destroy_state(UnifexEnv *env, SourceState *state) {
80-
state->connection->unregisterObserver(state->connObserver.get());
81-
state->connObserver.reset();
82-
83-
state->connection->getLocalUser()->unregisterLocalUserObserver(
84-
state->localUserObserver.get());
85-
state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver(
86-
state->videoEncodedFrameObserver.get());
87-
state->connection->getLocalUser()->unregisterAudioFrameObserver(
88-
state->audioFrameObserver.get());
89-
90-
state->localUserObserver.reset();
91-
state->videoEncodedFrameObserver.reset();
92-
state->audioFrameObserver.reset();
93-
9481
UNUSED(env);
82+
if (state->connection) {
83+
state->connection->unregisterObserver(state->connObserver.get());
84+
state->connObserver.reset();
85+
}
86+
87+
if (state->connection && state->localUserObserver)
88+
state->connection->getLocalUser()->unregisterLocalUserObserver(
89+
state->localUserObserver.get());
90+
91+
if (state->connection && state->videoEncodedFrameObserver)
92+
state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver(
93+
state->videoEncodedFrameObserver.get());
94+
if (state->connection && state->audioFrameObserver)
95+
state->connection->getLocalUser()->unregisterAudioFrameObserver(
96+
state->audioFrameObserver.get());
97+
98+
if (state->localUserObserver)
99+
state->localUserObserver.reset();
100+
101+
if (state->videoEncodedFrameObserver)
102+
state->videoEncodedFrameObserver.reset();
103+
104+
if (state->audioFrameObserver)
105+
state->audioFrameObserver.reset();
106+
95107
if (state->connection) {
96108
if (state->connection->disconnect()) {
97109
AG_LOG(ERROR, "Failed to disconnect from Agora channel!");

lib/agora/agora_sink.ex

Lines changed: 36 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ defmodule Membrane.Agora.Sink do
77

88
require Membrane.Pad, as: Pad
99

10+
alias Membrane.{AAC, Opus}
1011
alias Membrane.Agora.Sink.Native
1112

1213
def_input_pad :video,
@@ -17,7 +18,10 @@ defmodule Membrane.Agora.Sink do
1718
def_input_pad :audio,
1819
availability: :on_request,
1920
accepted_format: any_of(Membrane.AAC, Membrane.Opus),
20-
flow_control: :auto
21+
flow_control: :auto,
22+
options: [
23+
sample_rate: [default: nil]
24+
]
2125

2226
def_options app_id: [
2327
spec: String.t(),
@@ -54,7 +58,8 @@ defmodule Membrane.Agora.Sink do
5458
token: opts.token,
5559
channel_name: opts.channel_name,
5660
user_id: opts.user_id,
57-
native_state: nil
61+
native_state: nil,
62+
last_frame_duration: nil
5863
}
5964

6065
{[], state}
@@ -109,37 +114,13 @@ defmodule Membrane.Agora.Sink do
109114
end
110115

111116
@impl true
112-
def handle_stream_format(Pad.ref(:audio, _id) = pad, %Membrane.Opus{} = opus, ctx, state) do
113-
{:ok, native_state} =
114-
case ctx.pads[pad].options do
115-
%{frame_duration: frame_duration, sample_rate: sample_rate} ->
116-
samples_per_frame =
117-
(Membrane.Time.as_milliseconds(frame_duration, :round) * sample_rate)
118-
|> div(1000)
119-
120-
IO.inspect({sample_rate, opus.channels, samples_per_frame},
121-
label: "HANDLE STREAM FORMAT"
122-
)
123-
124-
Native.update_audio_stream_format(
125-
sample_rate,
126-
opus.channels,
127-
samples_per_frame,
128-
state.native_state
129-
)
130-
131-
pad_options ->
132-
raise """
133-
Pad options has to have :samples_per_frame and :sample_rate options when the stream format is \
134-
Membrane.Opus, but pad #{inspect(pad)} has options #{inspect(pad_options)}
135-
"""
136-
end
137-
138-
{[], %{state | native_state: native_state}}
117+
def handle_stream_format(Pad.ref(:audio, _id), %Opus{}, ctx, state) do
118+
# audio stream format will be updated in handle_buffer/4
119+
{[], state}
139120
end
140121

141122
@impl true
142-
def handle_buffer(Pad.ref(:video, _id), buffer, _ctx, state) do
123+
def handle_buffer(Pad.ref(:video, _id) = pad, buffer, ctx, state) do
143124
:ok =
144125
Native.write_video_data(
145126
buffer.payload,
@@ -154,13 +135,35 @@ defmodule Membrane.Agora.Sink do
154135
def handle_buffer(Pad.ref(:audio, _id) = pad, buffer, ctx, state) do
155136
stream_format =
156137
case ctx.pads[pad].stream_format do
157-
%Membrane.Opus{} -> :opus
158-
%Membrane.AAC{} -> :aac
138+
%Opus{} -> :opus
139+
%AAC{} -> :aac
159140
end
160141

161-
{stream_format, buffer} |> IO.inspect(label: "HANDLE BUFFER")
142+
state =
143+
if stream_format == :opus and buffer.metadata.duration != state.last_frame_duration do
144+
update_frame_duration(buffer.metadata.duration, pad, ctx, state)
145+
else
146+
state
147+
end
162148

163149
:ok = Native.write_audio_data(buffer.payload, stream_format, state.native_state)
164150
{[], state}
165151
end
152+
153+
defp update_frame_duration(frame_duration, pad, ctx, state) do
154+
pad_data = ctx.pads[pad]
155+
156+
sample_rate = pad_data.options.sample_rate
157+
samples_per_frame = (frame_duration * sample_rate) |> div(1000)
158+
159+
{:ok, native_state} =
160+
Native.update_audio_stream_format(
161+
sample_rate,
162+
pad_data.stream_format.channels,
163+
samples_per_frame,
164+
state.native_state
165+
)
166+
167+
%{state | native_state: native_state, last_frame_duration: frame_duration}
168+
end
166169
end

mix.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ defmodule Membrane.Agora.Mixfile do
4343
{:membrane_aac_format, "~> 0.8.0"},
4444
{:membrane_opus_format, "~> 0.3.0"},
4545
{:membrane_raw_audio_format, "~> 0.12.0"},
46+
{:membrane_ogg_plugin, "~> 0.5.0"},
4647
{:unifex, "~> 1.1"},
4748
{:membrane_file_plugin, "~> 0.16.0", only: :test},
4849
{:membrane_h26x_plugin, "~> 0.10.0", only: :test},

mix.lock

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
"bundlex": {:hex, :bundlex, "1.5.4", "3726acd463f4d31894a59bbc177c17f3b574634a524212f13469f41c4834a1d9", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, ">= 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "e745726606a560275182a8ac1c8ebd5e11a659bb7460d8abf30f397e59b4c5d2"},
66
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
77
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
8+
"crc": {:hex, :crc, "0.10.5", "ee12a7c056ac498ef2ea985ecdc9fa53c1bfb4e53a484d9f17ff94803707dfd8", [:mix, :rebar3], [{:elixir_make, "~> 0.6", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3e673b6495a9525c5c641585af1accba59a1eb33de697bedf341e247012c2c7f"},
89
"credo": {:hex, :credo, "1.7.10", "6e64fe59be8da5e30a1b96273b247b5cf1cc9e336b5fd66302a64b25749ad44d", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "71fbc9a6b8be21d993deca85bf151df023a3097b01e09a2809d460348561d8cd"},
910
"dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"},
1011
"earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"},
12+
"elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
1113
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
1214
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
1315
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
@@ -26,6 +28,7 @@
2628
"membrane_h264_format": {:hex, :membrane_h264_format, "0.6.1", "44836cd9de0abe989b146df1e114507787efc0cf0da2368f17a10c47b4e0738c", [:mix], [], "hexpm", "4b79be56465a876d2eac2c3af99e115374bbdc03eb1dea4f696ee9a8033cd4b0"},
2729
"membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"},
2830
"membrane_h26x_plugin": {:hex, :membrane_h26x_plugin, "0.10.2", "caf2790d8c107df35f8d456b45f4e09fb9c56ce6c7669a3a03f7d59972e6ed82", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}], "hexpm", "becf1ac4a589adecd850137ccd61a33058f686083a514a7e39fcd721bcf9fb2e"},
31+
"membrane_ogg_plugin": {:hex, :membrane_ogg_plugin, "0.5.0", "1e910cee8f311e186f8ee26cd4ab06bde7e9d0072eb04eb1abb890372463308f", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}], "hexpm", "dd009453c1e01c4da0e6afc23f0037d5cf6b1d3fcdf79959a61768588e725186"},
2932
"membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"},
3033
"membrane_opus_plugin": {:hex, :membrane_opus_plugin, "0.20.4", "0b018ab0cdd02e6fd1f27590a7382a81c0e2849212add0d76b2465385a7cfea7", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "a3c81fc2648887b601448f6dcaf195063a30585d7f0d3190dff2c5316d759af8"},
3134
"membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.2", "8af73b7dc15ba55c9f5fbfc0453d4a8edfb007ade54b56c37d626be0d1189aba", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "7fe3e07361510445a29bee95336adde667c4162b76b7f4c8af3aeb3415292023"},

test/fixtures/in_audio.opus

-296 KB
Binary file not shown.

test/fixtures/in_audio_opus.ogg

238 KB
Binary file not shown.

test/integration_test.exs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ defmodule Membrane.Agora.IntegrationTest do
33
import Membrane.Testing.Assertions
44
alias Membrane.Agora.Support.{ReceiverPipeline, SenderPipeline}
55

6-
for audio_codec <- [:opus] do
6+
for audio_codec <- [:opus, :aac] do
77
@tag :tmp_dir
8+
@tag audio_codec
89
test "if the data is sent to Agora properly when audio codec is #{audio_codec}", %{
910
tmp_dir: dir
1011
} do
@@ -14,9 +15,15 @@ defmodule Membrane.Agora.IntegrationTest do
1415
input_video = "test/fixtures/in_video.h264"
1516
output_video = "#{dir}/video.h264"
1617
reference_video = input_video
18+
audio_codec = unquote(audio_codec)
1719

18-
input_audio = "test/fixtures/in_audio.#{unquote(audio_codec)}"
19-
output_audio = "#{dir}/audio_#{unquote(audio_codec)}.pcm"
20+
input_audio =
21+
case audio_codec do
22+
:aac -> "test/fixtures/in_audio.aac"
23+
:opus -> "test/fixtures/in_audio_opus.ogg"
24+
end
25+
26+
output_audio = "#{dir}/audio_#{audio_codec}.pcm"
2027
reference_audio = "test/fixtures/in_audio.pcm"
2128

2229
{:ok, _supervisor, receiver_pipeline} =
@@ -31,6 +38,8 @@ defmodule Membrane.Agora.IntegrationTest do
3138
custom_args: [audio: input_audio, video: input_video, framerate: framerate]
3239
)
3340

41+
Process.sleep(3000)
42+
3443
assert_start_of_stream(receiver_pipeline, :video_sink, :input, 10_000)
3544
assert_start_of_stream(receiver_pipeline, :audio_sink)
3645

test/support/sender_pipeline.ex

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,25 @@ defmodule Membrane.Agora.Support.SenderPipeline do
1212
def handle_init(_ctx, opts) do
1313
user_id = "12"
1414

15-
{audio_parser, sink_audio_options} =
15+
{plug_audio_parsing, sink_audio_options} =
1616
opts[:audio]
1717
|> String.split(".")
1818
|> List.last()
1919
|> case do
2020
"aac" ->
21-
{Membrane.AAC.Parser, []}
22-
23-
"opus" ->
24-
{Membrane.Opus.Parser,
25-
[
26-
sample_rate: 48_000,
27-
frame_duration: Membrane.Time.milliseconds(20)
28-
]}
21+
{&child(&1, Membrane.AAC.Parser), []}
22+
23+
"ogg" ->
24+
plug_audio_parser =
25+
fn spec ->
26+
spec
27+
|> child(:ogg_demuxer, Membrane.Ogg.Demuxer)
28+
|> child(:opus_parser, %Membrane.Opus.Parser{
29+
generate_best_effort_timestamps?: true
30+
})
31+
end
32+
33+
{plug_audio_parser, [sample_rate: 48_000]}
2934
end
3035

3136
spec =
@@ -43,16 +48,7 @@ defmodule Membrane.Agora.Support.SenderPipeline do
4348
user_id: user_id
4449
}),
4550
child(%Membrane.File.Source{location: opts[:audio]})
46-
|> child(audio_parser)
47-
|> case do
48-
spec when audio_parser == Membrane.AAC.Parser ->
49-
spec
50-
51-
spec when audio_parser == Membrane.Opus.Parser ->
52-
spec
53-
|> child(Membrane.Opus.Decoder)
54-
|> child(Membrane.Opus.Encoder)
55-
end
51+
|> plug_audio_parsing.()
5652
|> child(Membrane.Realtimer)
5753
|> via_in(:audio, options: sink_audio_options)
5854
|> get_child(:sink)

xdump_config

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
rtc.enable_xdump|1|rtc.enable_xdump_file|1|rtc.enable_xdump_upload|1|

0 commit comments

Comments
 (0)