Skip to content

Commit

Permalink
Cache output of enumerate_devices
Browse files Browse the repository at this point in the history
  • Loading branch information
mutexlox-signal authored Dec 3, 2024
1 parent 5613fda commit 74619b6
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 77 deletions.
230 changes: 172 additions & 58 deletions src/rust/src/webrtc/audio_device_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
use crate::webrtc;
use crate::webrtc::audio_device_module_utils::{copy_and_truncate_string, DeviceCollectionWrapper};
use crate::webrtc::ffi::audio_device_module::RffiAudioTransport;
use anyhow::anyhow;
use anyhow::{anyhow, bail};
use cubeb::{Context, DeviceId, DeviceType, MonoFrame, Stream, StreamPrefs};
use cubeb_core::{log_enabled, set_logging, LogLevel};
use lazy_static::lazy_static;
use regex::Regex;
use std::collections::{HashMap, VecDeque};
use std::ffi::{c_uchar, c_void, CStr, CString};
use std::sync::{Arc, Mutex, RwLock};
use std::ffi::{c_uchar, c_void, CStr};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
};
use std::time::{Duration, Instant};
#[cfg(target_os = "windows")]
use windows::Win32::System::Com;
Expand Down Expand Up @@ -65,12 +68,22 @@ pub struct AudioDeviceModule {
audio_transport: Arc<Mutex<RffiAudioTransport>>,
cubeb_ctx: Option<Context>,
initialized: bool,
// Note that the DeviceIds must not outlive the cubeb_ctx.
playout_device: Option<DeviceId>,
recording_device: Option<DeviceId>,
// Note that the streams must not outlive the cubeb_ctx.
output_stream: Option<Stream<Frame>>,
input_stream: Option<Stream<Frame>>,
playing: bool,
recording: bool,
// Note that the caches must not outlive the cubeb_ctx.
input_device_cache: Option<DeviceCollectionWrapper>,
output_device_cache: Option<DeviceCollectionWrapper>,
// Flags to track whether we need to refresh caches.
// As these are shared with threads in cubeb, we create them once at ADM init
// and never free them.
pending_input_device_refresh: &'static AtomicBool,
pending_output_device_refresh: &'static AtomicBool,
}

impl Default for AudioDeviceModule {
Expand All @@ -87,6 +100,12 @@ impl Default for AudioDeviceModule {
input_stream: None,
playing: false,
recording: false,
input_device_cache: None,
output_device_cache: None,
// Start these both as true to request a cache refresh, and leak them for reasons
// mentioned at the struct declaration site.
pending_input_device_refresh: Box::leak(Box::new(AtomicBool::new(true))),
pending_output_device_refresh: Box::leak(Box::new(AtomicBool::new(true))),
}
}
}
Expand All @@ -108,7 +127,7 @@ const ADM_MAX_DEVICE_NAME_SIZE: usize = 128;
const ADM_MAX_GUID_SIZE: usize = 128;

/// Arbitrary string to uniquely identify ringrtc for creating the cubeb object.
const ADM_CONTEXT: &str = "ringrtc";
const ADM_CONTEXT: &CStr = c"ringrtc";

const SAMPLE_FREQUENCY: u32 = 48_000;
// Target sample latency. The actual sample latency will
Expand Down Expand Up @@ -213,6 +232,34 @@ impl AudioDeviceModule {
0
}

/// Safety: Must be called with a valid |flag| pointer. (NULL is okay.)
unsafe extern "C" fn device_changed(_ctx: *mut cubeb::ffi::cubeb, flag: *mut c_void) {
// Flag that an update is needed; this will be processed on the next enumerate_devices call.
if let Some(b) = (flag as *mut AtomicBool).as_ref() {
b.store(true, Ordering::SeqCst)
}
}

fn register_device_collection_changed(
&mut self,
device_type: DeviceType,
flag: &'static AtomicBool,
) -> anyhow::Result<()> {
let ctx = match &self.cubeb_ctx {
Some(ctx) => ctx,
None => bail!("Cannot register device changed callback without a ctx"),
};
unsafe {
// Safety: |callback| will remain a valid pointer for the lifetime of the program,
// as will |flag| (since it's static)
Ok(ctx.register_device_collection_changed(
device_type,
Some(AudioDeviceModule::device_changed),
flag.as_ptr() as *mut c_void,
)?)
}
}

// Main initialization and termination
pub fn init(&mut self) -> i32 {
// Don't bother re-initializing.
Expand All @@ -238,22 +285,41 @@ impl AudioDeviceModule {
return -1;
}
}
let ctx_name = CString::new(ADM_CONTEXT).unwrap();
match Context::init(Some(ctx_name.as_c_str()), None) {
match Context::init(Some(ADM_CONTEXT), None) {
Ok(ctx) => {
info!(
"Successfully initialized cubeb backend {}",
ctx.backend_id()
);
self.cubeb_ctx = Some(ctx);
self.initialized = true;
0
}
Err(e) => {
error!("Failed to initialize: {}", e);
-1
return -1;
}
}
if let Err(e) = self.register_device_collection_changed(
DeviceType::INPUT,
self.pending_input_device_refresh,
) {
error!("Failed to register input device callback: {}", e);
return -1;
}
if let Err(e) = self.register_device_collection_changed(
DeviceType::OUTPUT,
self.pending_output_device_refresh,
) {
error!("Failed to register input device callback: {}", e);
return -1;
}
// Caches are not set up, so request a refresh.
self.pending_input_device_refresh
.store(true, Ordering::SeqCst);
self.pending_output_device_refresh
.store(true, Ordering::SeqCst);
self.initialized = true;
0
}

pub fn backend_name(&self) -> Option<String> {
Expand All @@ -269,9 +335,37 @@ impl AudioDeviceModule {
if self.playing {
self.stop_playout();
}
// Cause these to Drop
// Cause these to Drop.
self.input_stream = None;
self.output_stream = None;
// Ensure these are not reused.
self.playout_device = None;
self.recording_device = None;
self.input_device_cache = None;
self.output_device_cache = None;
if let Some(ctx) = &self.cubeb_ctx {
// Clear callbacks.
unsafe {
// Safety: We are calling this with None, which will unset the callback,
// so passing null is safe.
if let Err(e) = ctx.register_device_collection_changed(
DeviceType::INPUT,
None,
std::ptr::null_mut(),
) {
warn!("failed to reset input callback: {}", e);
}
if let Err(e) = ctx.register_device_collection_changed(
DeviceType::OUTPUT,
None,
std::ptr::null_mut(),
) {
warn!("failed to reset output callback: {}", e);
}
}
}
// Invalidate the ctx (note that any references to it, like `DeviceId`s,
// must have already been dropped).
self.cubeb_ctx = None;
self.initialized = false;
#[cfg(target_os = "windows")]
Expand All @@ -288,15 +382,52 @@ impl AudioDeviceModule {
self.initialized
}

fn refresh_device_cache(&mut self, device_type: DeviceType) -> anyhow::Result<()> {
let ctx = match &self.cubeb_ctx {
Some(ctx) => ctx,
None => bail!("cannot refresh device cache without a ctx"),
};
let devices = ctx.enumerate_devices(device_type)?;
for device in devices.iter() {
info!(
"{:?} device: ({})",
device_type,
AudioDeviceModule::device_str(device)
);
}
let collection = DeviceCollectionWrapper::new(devices);
match device_type {
DeviceType::INPUT => self.input_device_cache = Some(collection),
DeviceType::OUTPUT => self.output_device_cache = Some(collection),
_ => bail!("Bad device type {:?}", device_type),
}
Ok(())
}

fn enumerate_devices(
&self,
&mut self,
device_type: DeviceType,
) -> anyhow::Result<DeviceCollectionWrapper> {
match &self.cubeb_ctx {
Some(ctx) => Ok(DeviceCollectionWrapper::new(
ctx.enumerate_devices(device_type)?,
)),
None => Err(anyhow!("Cannot enumerate devices without a cubeb ctx"))?,
) -> anyhow::Result<&DeviceCollectionWrapper> {
let pending_update = match device_type {
DeviceType::INPUT => self
.pending_input_device_refresh
.swap(false, Ordering::SeqCst),
DeviceType::OUTPUT => self
.pending_output_device_refresh
.swap(false, Ordering::SeqCst),
_ => bail!("Bad device type {:?}", device_type),
};
if pending_update {
self.refresh_device_cache(device_type)?;
}
let collection = match device_type {
DeviceType::INPUT => self.input_device_cache.as_ref(),
DeviceType::OUTPUT => self.output_device_cache.as_ref(),
_ => bail!("Bad device type {:?}", device_type),
};
match collection {
Some(c) => Ok(c),
None => Err(anyhow!("No {:?} collection found", device_type)),
}
}

Expand Down Expand Up @@ -345,7 +476,7 @@ impl AudioDeviceModule {
}

// Device enumeration
pub fn playout_devices(&self) -> i16 {
pub fn playout_devices(&mut self) -> i16 {
match self.enumerate_devices(DeviceType::OUTPUT) {
Ok(device_collection) => device_collection.count().try_into().unwrap_or(-1),
Err(e) => {
Expand All @@ -355,7 +486,7 @@ impl AudioDeviceModule {
}
}

pub fn recording_devices(&self) -> i16 {
pub fn recording_devices(&mut self) -> i16 {
match self.enumerate_devices(DeviceType::INPUT) {
Ok(device_collection) => device_collection.count().try_into().unwrap_or(-1),
Err(e) => {
Expand All @@ -367,12 +498,12 @@ impl AudioDeviceModule {

fn copy_name_and_id(
index: u16,
devices: DeviceCollectionWrapper,
devices: &DeviceCollectionWrapper,
name_out: webrtc::ptr::Borrowed<c_uchar>,
guid_out: webrtc::ptr::Borrowed<c_uchar>,
) -> anyhow::Result<()> {
if let Some(d) = devices.get(index.into()) {
if let Some(name) = d.friendly_name() {
if let Some(name) = &d.friendly_name {
let mut name_copy = name.to_string();
// TODO(mutexlox): Localize these strings.
#[cfg(not(target_os = "windows"))]
Expand All @@ -391,7 +522,7 @@ impl AudioDeviceModule {
} else {
return Err(anyhow!("Could not get device name"));
}
if let Some(id) = d.device_id() {
if let Some(id) = &d.device_id {
copy_and_truncate_string(id, guid_out, ADM_MAX_GUID_SIZE)?;
} else {
return Err(anyhow!("Could not get device ID"));
Expand All @@ -407,7 +538,7 @@ impl AudioDeviceModule {
}

pub fn playout_device_name(
&self,
&mut self,
index: u16,
name_out: webrtc::ptr::Borrowed<c_uchar>,
guid_out: webrtc::ptr::Borrowed<c_uchar>,
Expand All @@ -430,7 +561,7 @@ impl AudioDeviceModule {
}

pub fn recording_device_name(
&self,
&mut self,
index: u16,
name_out: webrtc::ptr::Borrowed<c_uchar>,
guid_out: webrtc::ptr::Borrowed<c_uchar>,
Expand All @@ -455,26 +586,17 @@ impl AudioDeviceModule {
// Device selection
pub fn set_playout_device(&mut self, index: u16) -> i32 {
let device = match self.enumerate_devices(DeviceType::OUTPUT) {
Ok(devices) => {
for device in devices.iter() {
info!(
"Playout device: ({})",
AudioDeviceModule::device_str(device)
Ok(devices) => match devices.get(index as usize) {
Some(device) => device.devid,
None => {
error!(
"Invalid playout device index {} requested (len {})",
index,
devices.count()
);
return -1;
}

match devices.get(index as usize) {
Some(device) => device.devid(),
None => {
error!(
"Invalid device index {} requested (len {})",
index,
devices.count()
);
return -1;
}
}
}
},
Err(e) => {
error!("failed to enumerate devices for playout device: {}", e);
return -1;
Expand All @@ -495,25 +617,17 @@ impl AudioDeviceModule {

pub fn set_recording_device(&mut self, index: u16) -> i32 {
let device = match self.enumerate_devices(DeviceType::INPUT) {
Ok(devices) => {
for device in devices.iter() {
info!(
"Recording device: ({})",
AudioDeviceModule::device_str(device)
Ok(devices) => match devices.get(index as usize) {
Some(device) => device.devid,
None => {
error!(
"Invalid recording device index {} requested (len {})",
index,
devices.count()
);
return -1;
}
match devices.get(index as usize) {
Some(device) => device.devid(),
None => {
error!(
"Invalid device index {} requested (len {})",
index,
devices.count()
);
return -1;
}
}
}
},
Err(e) => {
error!("failed to enumerate devices for playout device: {}", e);
return -1;
Expand Down
Loading

0 comments on commit 74619b6

Please sign in to comment.