Skip to content

Commit 26fb191

Browse files
authored
Support OPUS (#24)
1 parent 5ee7381 commit 26fb191

File tree

13 files changed

+269
-99
lines changed

13 files changed

+269
-99
lines changed

c_src/membrane_agora_plugin/sink.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
2424
scfg.enableVideo = true;
2525
scfg.useStringUid = false;
2626
if (state->service->initialize(scfg) != agora::ERR_OK) {
27-
AG_LOG(ERROR, "Failed to initialize service");
27+
AG_LOG(ERROR, "Failed to initialize Agora service in sink");
2828
unifex_release_state(env, state);
29-
return create_result_error(env, "Failed to initialize service");
29+
return create_result_error(env,
30+
"Failed to initialize Agora service in sink");
3031
}
3132

3233
// connection configuration
@@ -151,12 +152,21 @@ UNIFEX_TERM update_audio_stream_format(UnifexEnv *env, int sampleRate,
151152
}
152153

153154
UNIFEX_TERM write_audio_data(UnifexEnv *env, UnifexPayload *payload,
154-
SinkState *state) {
155+
CodecAudio codec, SinkState *state) {
155156
agora::rtc::EncodedAudioFrameInfo audioFrameInfo;
156157
audioFrameInfo.sampleRateHz = state->sampleRate;
157158
audioFrameInfo.numberOfChannels = state->numberOfChannels;
158159
audioFrameInfo.samplesPerChannel = state->samplesPerChannelPerFrame;
159-
audioFrameInfo.codec = agora::rtc::AUDIO_CODEC_TYPE::AUDIO_CODEC_AACLC;
160+
161+
if (codec == CODEC_AUDIO_AAC) {
162+
audioFrameInfo.codec = agora::rtc::AUDIO_CODEC_TYPE::AUDIO_CODEC_AACLC;
163+
} else if (codec == CODEC_AUDIO_OPUS) {
164+
audioFrameInfo.codec = agora::rtc::AUDIO_CODEC_TYPE::AUDIO_CODEC_OPUS;
165+
} else {
166+
AG_LOG(WARNING, "Audio codec passed to sink is neither AAC nor Opus, but "
167+
"only these two values are supported for now.");
168+
}
169+
160170
if (state->audioEncodedFrameSender->sendEncodedAudioFrame(
161171
reinterpret_cast<uint8_t *>(payload->data), payload->size,
162172
audioFrameInfo) != true) {
@@ -186,13 +196,13 @@ void handle_destroy_state(UnifexEnv *env, SinkState *state) {
186196
AG_LOG(ERROR, "Failed to disconnect from Agora channel!");
187197
return;
188198
}
189-
AG_LOG(INFO, "Disconnected from Agora channel successfully");
199+
AG_LOG(INFO, "[Sink] Disconnected from Agora channel successfully");
190200
state->connection = NULL;
191201
}
192202

193203
if (state->service) {
194204
state->service->release();
195-
AG_LOG(INFO, "Agora service released successfully");
205+
AG_LOG(INFO, "[Sink] Agora service released successfully");
196206
state->service = NULL;
197207
}
198208
}

c_src/membrane_agora_plugin/sink.spec.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ module(Membrane.Agora.Sink.Native)
22

33
state_type("SinkState")
44

5+
type codec_audio :: :aac | :opus
6+
57
spec(
68
create(
79
app_id :: string,
@@ -22,7 +24,7 @@ spec(
2224
)
2325

2426
spec(
25-
write_audio_data(payload, state) ::
27+
write_audio_data(payload, codec :: codec_audio, state) ::
2628
(:ok :: label) | {:error :: label, reason :: atom}
2729
)
2830

c_src/membrane_agora_plugin/source.cpp

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
1919
scfg.enableVideo = true;
2020
scfg.useStringUid = false;
2121
if (state->service->initialize(scfg) != agora::ERR_OK) {
22-
AG_LOG(ERROR, "Failed to initialize service");
22+
AG_LOG(ERROR, "Failed to initialize Agora service in source");
23+
state->service = NULL;
2324
unifex_release_state(env, state);
24-
return create_result_error(env, "Failed to initialize service");
25+
return create_result_error(env,
26+
"Failed to initialize Agora service in source");
2527
}
2628

2729
// connection configuration
@@ -77,33 +79,44 @@ UNIFEX_TERM create(UnifexEnv *env, char *appId, char *token, char *channelId,
7779
}
7880

7981
void handle_destroy_state(UnifexEnv *env, SourceState *state) {
80-
state->connection->unregisterObserver(state->connObserver.get());
81-
state->connObserver.reset();
82+
UNUSED(env);
83+
if (state->connection) {
84+
state->connection->unregisterObserver(state->connObserver.get());
85+
state->connObserver.reset();
86+
}
8287

83-
state->connection->getLocalUser()->unregisterLocalUserObserver(
84-
state->localUserObserver.get());
85-
state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver(
86-
state->videoEncodedFrameObserver.get());
87-
state->connection->getLocalUser()->unregisterAudioFrameObserver(
88-
state->audioFrameObserver.get());
88+
if (state->connection && state->localUserObserver)
89+
state->connection->getLocalUser()->unregisterLocalUserObserver(
90+
state->localUserObserver.get());
8991

90-
state->localUserObserver.reset();
91-
state->videoEncodedFrameObserver.reset();
92-
state->audioFrameObserver.reset();
92+
if (state->connection && state->videoEncodedFrameObserver)
93+
state->connection->getLocalUser()->unregisterVideoEncodedFrameObserver(
94+
state->videoEncodedFrameObserver.get());
95+
if (state->connection && state->audioFrameObserver)
96+
state->connection->getLocalUser()->unregisterAudioFrameObserver(
97+
state->audioFrameObserver.get());
98+
99+
if (state->localUserObserver)
100+
state->localUserObserver.reset();
101+
102+
if (state->videoEncodedFrameObserver)
103+
state->videoEncodedFrameObserver.reset();
104+
105+
if (state->audioFrameObserver)
106+
state->audioFrameObserver.reset();
93107

94-
UNUSED(env);
95108
if (state->connection) {
96109
if (state->connection->disconnect()) {
97110
AG_LOG(ERROR, "Failed to disconnect from Agora channel!");
98111
return;
99112
}
100-
AG_LOG(INFO, "Disconnected from Agora channel successfully");
113+
AG_LOG(INFO, "[Source] Disconnected from Agora channel successfully");
101114
state->connection = NULL;
102115
}
103116

104117
if (state->service) {
105118
state->service->release();
106-
AG_LOG(INFO, "Agora service released successfully");
119+
AG_LOG(INFO, "[Source] Agora service released successfully");
107120
state->service = NULL;
108121
}
109122
}

lib/agora/sink.ex

Lines changed: 97 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule Membrane.Agora.Sink do
88
require Membrane.Logger
99
require Membrane.Pad, as: Pad
1010

11+
alias Membrane.{AAC, Buffer, Opus}
1112
alias Membrane.Agora.Sink.Native
1213

1314
def_input_pad :video,
@@ -17,8 +18,22 @@ defmodule Membrane.Agora.Sink do
1718

1819
def_input_pad :audio,
1920
availability: :on_request,
20-
accepted_format: Membrane.AAC,
21-
flow_control: :auto
21+
accepted_format: any_of(Membrane.AAC, Membrane.Opus),
22+
flow_control: :auto,
23+
options: [
24+
sample_rate: [
25+
spec: pos_integer(),
26+
default: 48_000,
27+
description: """
28+
Sample rate of the audio stream going through :audio pad.
29+
30+
Used only if the audio codec is `Membrane.Opus`. If the audio codec is
31+
`Membrane.AAC`, sample rate value will be passed in the stream format.
32+
33+
Defaults to 48 000.
34+
"""
35+
]
36+
]
2237

2338
def_options app_id: [
2439
spec: String.t(),
@@ -55,7 +70,9 @@ defmodule Membrane.Agora.Sink do
5570
token: opts.token,
5671
channel_name: opts.channel_name,
5772
user_id: opts.user_id,
58-
native_state: nil
73+
native_state: nil,
74+
last_frame_duration: nil,
75+
opus_queue: []
5976
}
6077

6178
{[], state}
@@ -86,8 +103,8 @@ defmodule Membrane.Agora.Sink do
86103
_e in UndefinedFunctionError ->
87104
reraise(
88105
"""
89-
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
90-
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
106+
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH: \
107+
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")} \
91108
""",
92109
__STACKTRACE__
93110
)
@@ -113,18 +130,24 @@ defmodule Membrane.Agora.Sink do
113130
end
114131

115132
@impl true
116-
def handle_stream_format(Pad.ref(:audio, _id), stream_format, _ctx, state) do
133+
def handle_stream_format(Pad.ref(:audio, _id), %Membrane.AAC{} = aac, _ctx, state) do
117134
{:ok, native_state} =
118135
Native.update_audio_stream_format(
119-
stream_format.sample_rate,
120-
stream_format.channels,
121-
stream_format.samples_per_frame,
136+
aac.sample_rate,
137+
aac.channels,
138+
aac.samples_per_frame,
122139
state.native_state
123140
)
124141

125142
{[], %{state | native_state: native_state}}
126143
end
127144

145+
@impl true
146+
def handle_stream_format(Pad.ref(:audio, _id), %Opus{}, _ctx, state) do
147+
# when audio codec is Opus, Native.update_audio_stream_format/4 is called in handle_buffer/4
148+
{[], state}
149+
end
150+
128151
@impl true
129152
def handle_buffer(Pad.ref(:video, _id), buffer, _ctx, state) do
130153
:ok =
@@ -138,8 +161,71 @@ defmodule Membrane.Agora.Sink do
138161
end
139162

140163
@impl true
141-
def handle_buffer(Pad.ref(:audio, _id), buffer, _ctx, state) do
142-
:ok = Native.write_audio_data(buffer.payload, state.native_state)
164+
def handle_buffer(Pad.ref(:audio, _id) = pad, buffer, ctx, state) do
165+
state =
166+
case ctx.pads[pad].stream_format do
167+
%Opus{} -> handle_opus_buffer(pad, buffer, ctx, state)
168+
%AAC{} -> handle_aac_buffer(buffer, state)
169+
end
170+
143171
{[], state}
144172
end
173+
174+
defp handle_opus_buffer(pad, buffer, ctx, state) do
175+
opus_queue =
176+
case state.opus_queue do
177+
[%Buffer{} = previous] ->
178+
previous_metadata =
179+
previous.metadata
180+
|> Map.put(:duration, buffer.pts - previous.pts)
181+
182+
previous = %{previous | metadata: previous_metadata}
183+
[previous, buffer]
184+
185+
[] ->
186+
[buffer]
187+
end
188+
189+
{buffers_with_duration, opus_queue} =
190+
opus_queue
191+
|> Enum.split_while(&is_map_key(&1.metadata, :duration))
192+
193+
state =
194+
buffers_with_duration
195+
|> Enum.reduce(state, fn buffer, state ->
196+
state =
197+
if buffer.metadata.duration != state.last_frame_duration do
198+
update_frame_duration(buffer.metadata.duration, pad, ctx, state)
199+
else
200+
state
201+
end
202+
203+
:ok = Native.write_audio_data(buffer.payload, :opus, state.native_state)
204+
state
205+
end)
206+
207+
%{state | opus_queue: opus_queue}
208+
end
209+
210+
defp handle_aac_buffer(buffer, state) do
211+
:ok = Native.write_audio_data(buffer.payload, :aac, state.native_state)
212+
state
213+
end
214+
215+
defp update_frame_duration(frame_duration, pad, ctx, state) do
216+
pad_data = ctx.pads[pad]
217+
218+
sample_rate = pad_data.options.sample_rate
219+
samples_per_frame = (frame_duration * sample_rate) |> div(1000)
220+
221+
{:ok, native_state} =
222+
Native.update_audio_stream_format(
223+
sample_rate,
224+
pad_data.stream_format.channels,
225+
samples_per_frame,
226+
state.native_state
227+
)
228+
229+
%{state | native_state: native_state, last_frame_duration: frame_duration}
230+
end
145231
end

lib/agora/sink_native.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
defmodule Membrane.Agora.Sink.Native do
22
@moduledoc false
33

4-
if match?(%{os: "linux", architecture: "x86_64"}, Bundlex.get_target()) do
4+
target = Bundlex.get_target()
5+
6+
if match?(%{os: "linux", architecture: "x86_64"}, target) do
57
use Unifex.Loader
8+
else
9+
IO.warn("""
10+
Agora SDK used by #{inspect(__MODULE__)} works only on linux with architecture x86_64, while \
11+
you are now on #{inspect(target.os)} with architecture #{inspect(target.architecture)}.
12+
""")
613
end
714
end

lib/agora/source.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ defmodule Membrane.Agora.Source do
8787
_e in UndefinedFunctionError ->
8888
reraise(
8989
"""
90-
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
91-
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
90+
Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH: \
91+
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")} \
9292
""",
9393
__STACKTRACE__
9494
)

lib/agora/source_native.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
defmodule Membrane.Agora.Source.Native do
22
@moduledoc false
33

4-
if match?(%{os: "linux", architecture: "x86_64"}, Bundlex.get_target()) do
4+
target = Bundlex.get_target()
5+
6+
if match?(%{os: "linux", architecture: "x86_64"}, target) do
57
use Unifex.Loader
8+
else
9+
IO.warn("""
10+
Agora SDK used by #{inspect(__MODULE__)} works only on linux with architecture x86_64, while \
11+
you are now on #{inspect(target.os)} with architecture #{inspect(target.architecture)}.
12+
""")
613
end
714
end

mix.exs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@ defmodule Membrane.Agora.Mixfile do
4141
{:membrane_core, "~> 1.0"},
4242
{:membrane_h264_format, "~> 0.6.1"},
4343
{:membrane_aac_format, "~> 0.8.0"},
44+
{:membrane_opus_format, "~> 0.3.0"},
4445
{:membrane_raw_audio_format, "~> 0.12.0"},
46+
{:membrane_ogg_plugin, "~> 0.5.0"},
4547
{:unifex, "~> 1.1"},
4648
{:membrane_file_plugin, "~> 0.16.0", only: :test},
4749
{:membrane_h26x_plugin, "~> 0.10.0", only: :test},
4850
{:membrane_aac_plugin, "~> 0.18.1", only: :test},
51+
{:membrane_opus_plugin, "~> 0.20.4", only: :test},
4952
{:membrane_realtimer_plugin, "~> 0.9.0", only: :test},
5053
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
5154
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},

0 commit comments

Comments
 (0)