diff --git a/examples/sink_id.rs b/examples/sink_id.rs index a5458976..835c9c25 100644 --- a/examples/sink_id.rs +++ b/examples/sink_id.rs @@ -25,5 +25,10 @@ fn main() { osc.connect(&context.destination()); osc.start(); - std::thread::sleep(std::time::Duration::from_secs(4)); + loop { + println!("Choose output device, enter the 'device_id' and press :"); + let sink_id = std::io::stdin().lines().next().unwrap().unwrap(); + context.set_sink_id_sync(sink_id).unwrap(); + println!("Playing beep for sink {:?}", context.sink_id()); + } } diff --git a/examples/toy_webrtc.rs b/examples/toy_webrtc.rs index 37106882..3d242b6a 100644 --- a/examples/toy_webrtc.rs +++ b/examples/toy_webrtc.rs @@ -79,7 +79,7 @@ fn run_server() -> std::io::Result<()> { }; */ - socket.send_to(buf, &src)?; + socket.send_to(buf, src)?; } } diff --git a/src/context/concrete_base.rs b/src/context/concrete_base.rs index 01b014af..aac66250 100644 --- a/src/context/concrete_base.rs +++ b/src/context/concrete_base.rs @@ -12,9 +12,9 @@ use crate::spatial::AudioListenerParams; use crate::AudioListener; -use crossbeam_channel::Sender; +use crossbeam_channel::{SendError, Sender}; use std::sync::atomic::{AtomicU64, AtomicU8, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; /// The struct that corresponds to the Javascript `BaseAudioContext` object. /// @@ -51,7 +51,7 @@ struct ConcreteBaseAudioContextInner { /// destination node's current channel count destination_channel_config: ChannelConfig, /// message channel from control to render thread - render_channel: Sender, + render_channel: RwLock>, /// control messages that cannot be sent immediately queued_messages: Mutex>, /// number of frames played @@ -104,7 +104,7 @@ impl BaseAudioContext for ConcreteBaseAudioContext { self.inner.queued_audio_listener_msgs.lock().unwrap(); queued_audio_listener_msgs.push(message); } else { - self.inner.render_channel.send(message).unwrap(); + self.send_control_msg(message).unwrap(); self.resolve_queued_control_msgs(id); } @@ -124,7 +124,7 @@ impl ConcreteBaseAudioContext { let base_inner = ConcreteBaseAudioContextInner { sample_rate, max_channel_count, - render_channel, + render_channel: RwLock::new(render_channel), queued_messages: Mutex::new(Vec::new()), node_id_inc: AtomicU64::new(0), destination_channel_config: ChannelConfigOptions::default().into(), @@ -188,6 +188,17 @@ impl ConcreteBaseAudioContext { base } + pub(crate) fn send_control_msg( + &self, + msg: ControlMessage, + ) -> Result<(), SendError> { + self.inner.render_channel.read().unwrap().send(msg) + } + + pub(crate) fn lock_control_msg_sender(&self) -> RwLockWriteGuard> { + self.inner.render_channel.write().unwrap() + } + /// Inform render thread that the control thread `AudioNode` no langer has any handles pub(super) fn mark_node_dropped(&self, id: u64) { // do not drop magic nodes @@ -199,7 +210,7 @@ impl ConcreteBaseAudioContext { // Sending the message will fail when the render thread has already shut down. // This is fine - let _r = self.inner.render_channel.send(message); + let _r = self.send_control_msg(message); } } @@ -211,7 +222,7 @@ impl ConcreteBaseAudioContext { // Sending the message will fail when the render thread has already shut down. // This is fine - let _r = self.inner.render_channel.send(message); + let _r = self.send_control_msg(message); } /// `ChannelConfig` of the `AudioDestinationNode` @@ -283,7 +294,7 @@ impl ConcreteBaseAudioContext { while i < queued.len() { if matches!(&queued[i], ControlMessage::ConnectNode {to, ..} if *to == id) { let m = queued.remove(i); - self.inner.render_channel.send(m).unwrap(); + self.send_control_msg(m).unwrap(); } else { i += 1; } @@ -304,7 +315,7 @@ impl ConcreteBaseAudioContext { output, input, }; - self.inner.render_channel.send(message).unwrap(); + self.send_control_msg(message).unwrap(); } /// Schedule a connection of an `AudioParam` to the `AudioNode` it belongs to @@ -326,13 +337,13 @@ impl ConcreteBaseAudioContext { from: from.0, to: to.0, }; - self.inner.render_channel.send(message).unwrap(); + self.send_control_msg(message).unwrap(); } /// Disconnects all outgoing connections from the audio node. pub(crate) fn disconnect(&self, from: &AudioNodeId) { let message = ControlMessage::DisconnectAll { from: from.0 }; - self.inner.render_channel.send(message).unwrap(); + self.send_control_msg(message).unwrap(); } /// Pass an `AudioParam::AudioParamEvent` to the render thread @@ -348,7 +359,7 @@ impl ConcreteBaseAudioContext { to: to.clone(), event, }; - self.inner.render_channel.send(message).unwrap(); + self.send_control_msg(message).unwrap(); } /// Attach the 9 `AudioListener` coordinates to a `PannerNode` @@ -370,7 +381,7 @@ impl ConcreteBaseAudioContext { let mut released = false; while let Some(message) = queued_audio_listener_msgs.pop() { // add the AudioListenerRenderer to the graph - self.inner.render_channel.send(message).unwrap(); + self.send_control_msg(message).unwrap(); released = true; } diff --git a/src/context/mod.rs b/src/context/mod.rs index d06ab434..831b5745 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -42,7 +42,7 @@ impl From<&AudioParamId> for NodeIndex { } /// Describes the current state of the `AudioContext` -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AudioContextState { /// This context is currently suspended (context time is not proceeding, /// audio hardware may be powered down/released). diff --git a/src/context/offline.rs b/src/context/offline.rs index 65c48200..bc8c3741 100644 --- a/src/context/offline.rs +++ b/src/context/offline.rs @@ -59,12 +59,17 @@ impl OfflineAudioContext { /// * `length` - length of the rendering audio buffer /// * `sample_rate` - output sample rate #[must_use] + #[allow(clippy::missing_panics_doc)] pub fn new(number_of_channels: usize, length: usize, sample_rate: f32) -> Self { assert_valid_sample_rate(sample_rate); // communication channel to the render thread let (sender, receiver) = crossbeam_channel::unbounded(); + let graph = crate::render::graph::Graph::new(); + let message = crate::message::ControlMessage::Startup { graph }; + sender.send(message).unwrap(); + // track number of frames - synced from render thread to control thread let frames_played = Arc::new(AtomicU64::new(0)); let frames_played_clone = frames_played.clone(); diff --git a/src/context/online.rs b/src/context/online.rs index 9c6ec95a..3151e28b 100644 --- a/src/context/online.rs +++ b/src/context/online.rs @@ -1,12 +1,13 @@ //! The `AudioContext` type and constructor options use crate::context::{AudioContextState, BaseAudioContext, ConcreteBaseAudioContext}; -use crate::io::{self, AudioBackend}; +use crate::io::{self, AudioBackendManager, ControlThreadInit, RenderThreadInit}; use crate::media::{MediaElement, MediaStream}; +use crate::message::ControlMessage; use crate::node::{self, ChannelConfigOptions}; use crate::AudioRenderCapacity; -use std::sync::atomic::AtomicU64; -use std::sync::Arc; +use std::error::Error; +use std::sync::Mutex; /// Identify the type of playback, which affects tradeoffs /// between audio output latency and power consumption @@ -57,9 +58,11 @@ pub struct AudioContext { /// represents the underlying `BaseAudioContext` base: ConcreteBaseAudioContext, /// audio backend (play/pause functionality) - backend: Box, + backend_manager: Mutex>, /// Provider for rendering performance metrics render_capacity: AudioRenderCapacity, + /// Initializer for the render thread (when restart is required) + render_thread_init: RenderThreadInit, } impl BaseAudioContext for AudioContext { @@ -80,7 +83,7 @@ impl AudioContext { /// This will play live audio on the default output device. /// /// ```no_run - /// use web_audio_api::context::{AudioContext, AudioContextLatencyCategory, AudioContextOptions}; + /// use web_audio_api::context::{AudioContext, AudioContextOptions}; /// /// // Request a sample rate of 44.1 kHz and default latency (buffer size 128, if available) /// let opts = AudioContextOptions { @@ -94,33 +97,55 @@ impl AudioContext { /// // Alternatively, use the default constructor to get the best settings for your hardware /// // let context = AudioContext::default(); /// ``` + /// + /// # Panics + /// + /// The `AudioContext` constructor will panic when an invalid `sinkId` is provided in the + /// `AudioContextOptions`. In a future version, a `try_new` constructor will be introduced that + /// will never panic. #[allow(clippy::needless_pass_by_value)] #[must_use] pub fn new(options: AudioContextOptions) -> Self { - // track number of frames - synced from render thread to control thread - let frames_played = Arc::new(AtomicU64::new(0)); - let frames_played_clone = frames_played.clone(); + if let Some(sink_id) = &options.sink_id { + if !crate::enumerate_devices() + .into_iter() + .any(|d| d.device_id() == sink_id) + { + panic!("NotFoundError: invalid sinkId"); + } + } - // select backend based on cargo features - let (backend, sender, cap_recv) = io::build_output(options, frames_played_clone); + let (control_thread_init, render_thread_init) = io::thread_init(); + let backend = io::build_output(options, render_thread_init.clone()); + + let ControlThreadInit { + frames_played, + ctrl_msg_send, + load_value_recv, + } = control_thread_init; + + let graph = crate::render::graph::Graph::new(); + let message = crate::message::ControlMessage::Startup { graph }; + ctrl_msg_send.send(message).unwrap(); let base = ConcreteBaseAudioContext::new( backend.sample_rate(), backend.number_of_channels(), frames_played, - sender, + ctrl_msg_send, false, ); base.set_state(AudioContextState::Running); // setup AudioRenderCapacity for this context let base_clone = base.clone(); - let render_capacity = AudioRenderCapacity::new(base_clone, cap_recv); + let render_capacity = AudioRenderCapacity::new(base_clone, load_value_recv); Self { base, - backend, + backend_manager: Mutex::new(backend), render_capacity, + render_thread_init, } } @@ -131,7 +156,7 @@ impl AudioContext { // it to the audio subsystem, so this value is zero. (see Gecko) #[allow(clippy::unused_self)] #[must_use] - pub const fn base_latency(&self) -> f64 { + pub fn base_latency(&self) -> f64 { 0. } @@ -140,15 +165,102 @@ impl AudioContext { /// the time at which the first sample in the buffer is actually processed /// by the audio output device. #[must_use] + #[allow(clippy::missing_panics_doc)] pub fn output_latency(&self) -> f64 { - self.backend.output_latency() + self.backend_manager.lock().unwrap().output_latency() } /// Identifier or the information of the current audio output device. /// /// The initial value is `None`, which means the default audio output device. - pub fn sink_id(&self) -> Option<&str> { - self.backend.sink_id() + #[allow(clippy::missing_panics_doc)] + pub fn sink_id(&self) -> Option { + self.backend_manager + .lock() + .unwrap() + .sink_id() + .map(str::to_string) + } + + /// Update the current audio output device. + /// + /// This function operates synchronously and might block the current thread. An async version + /// is currently not implemented. + #[allow(clippy::needless_collect, clippy::missing_panics_doc)] + pub fn set_sink_id_sync(&self, sink_id: String) -> Result<(), Box> { + if self.sink_id().as_deref() == Some(&sink_id) { + return Ok(()); // sink is already active + } + + if !crate::enumerate_devices() + .into_iter() + .any(|d| d.device_id() == sink_id) + { + Err("NotFoundError: invalid sinkId")?; + } + + let mut backend_manager_guard = self.backend_manager.lock().unwrap(); + let state = self.state(); + if state == AudioContextState::Closed { + return Ok(()); + } + + // Acquire exclusive lock on ctrl msg sender + let ctrl_msg_send = self.base.lock_control_msg_sender(); + + // Flush out the ctrl msg receiver, cache + let mut pending_msgs: Vec<_> = self.render_thread_init.ctrl_msg_recv.try_iter().collect(); + + // Acquire the active audio graph from the current render thread, shutting it down + let graph = if matches!(pending_msgs.get(0), Some(ControlMessage::Startup { .. })) { + // Handle the edge case where the previous backend was suspended for its entire lifetime. + // In this case, the `Startup` control message was never processed. + let msg = pending_msgs.remove(0); + match msg { + ControlMessage::Startup { graph } => graph, + _ => unreachable!(), + } + } else { + // Acquire the audio graph from the current render thread, shutting it down + let (graph_send, graph_recv) = crossbeam_channel::bounded(1); + let message = ControlMessage::Shutdown { sender: graph_send }; + ctrl_msg_send.send(message).unwrap(); + if state == AudioContextState::Suspended { + // We must wake up the render thread to be able to handle the shutdown. + // No new audio will be produced because it will receive the shutdown command first. + backend_manager_guard.resume(); + } + graph_recv.recv().unwrap() + }; + + // hotswap the backend + let options = AudioContextOptions { + sample_rate: Some(self.sample_rate()), + latency_hint: AudioContextLatencyCategory::default(), // todo reuse existing setting + sink_id: Some(sink_id), + }; + *backend_manager_guard = io::build_output(options, self.render_thread_init.clone()); + + // if the previous backend state was suspend, suspend the new one before shipping the graph + if state == AudioContextState::Suspended { + backend_manager_guard.suspend(); + } + + // send the audio graph to the new render thread + let message = ControlMessage::Startup { graph }; + ctrl_msg_send.send(message).unwrap(); + + self.base().set_state(AudioContextState::Running); + + // flush the cached msgs + pending_msgs + .into_iter() + .for_each(|m| self.base().send_control_msg(m).unwrap()); + + // explicitly release the lock to prevent concurrent render threads + drop(backend_manager_guard); + + Ok(()) } /// Suspends the progression of time in the audio context. @@ -167,7 +279,7 @@ impl AudioContext { /// * For a `BackendSpecificError` #[allow(clippy::missing_const_for_fn, clippy::unused_self)] pub fn suspend_sync(&self) { - if self.backend.suspend() { + if self.backend_manager.lock().unwrap().suspend() { self.base().set_state(AudioContextState::Suspended); } } @@ -186,7 +298,7 @@ impl AudioContext { /// * For a `BackendSpecificError` #[allow(clippy::missing_const_for_fn, clippy::unused_self)] pub fn resume_sync(&self) { - if self.backend.resume() { + if self.backend_manager.lock().unwrap().resume() { self.base().set_state(AudioContextState::Running); } } @@ -204,7 +316,7 @@ impl AudioContext { /// Will panic when this function is called multiple times #[allow(clippy::missing_const_for_fn, clippy::unused_self)] pub fn close_sync(&self) { - self.backend.close(); + self.backend_manager.lock().unwrap().close(); self.base().set_state(AudioContextState::Closed); } diff --git a/src/io/backend_cpal.rs b/src/io/backend_cpal.rs index 55a09d03..5e65adf8 100644 --- a/src/io/backend_cpal.rs +++ b/src/io/backend_cpal.rs @@ -1,11 +1,9 @@ //! Audio IO management API use std::convert::TryFrom; -use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::sync::Mutex; -use crate::message::ControlMessage; -use crate::{AtomicF64, AudioRenderCapacityLoad, RENDER_QUANTUM_SIZE}; +use crate::{AtomicF64, RENDER_QUANTUM_SIZE}; use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, @@ -13,13 +11,13 @@ use cpal::{ Stream, StreamConfig, SupportedBufferSize, }; -use super::{AudioBackend, MediaDeviceInfo, MediaDeviceInfoKind}; +use super::{AudioBackendManager, MediaDeviceInfo, MediaDeviceInfoKind, RenderThreadInit}; use crate::buffer::AudioBuffer; -use crate::context::{AudioContextLatencyCategory, AudioContextOptions}; +use crate::context::AudioContextOptions; use crate::media::MicrophoneRender; use crate::render::RenderThread; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Receiver; mod private { use super::*; @@ -78,21 +76,20 @@ pub struct CpalBackend { sink_id: Option, } -impl AudioBackend for CpalBackend { - fn build_output( - options: AudioContextOptions, - frames_played: Arc, - ) -> ( - Self, - Sender, - Receiver, - ) +impl AudioBackendManager for CpalBackend { + fn build_output(options: AudioContextOptions, render_thread_init: RenderThreadInit) -> Self where Self: Sized, { let host = cpal::default_host(); log::info!("Host: {:?}", host.id()); + let RenderThreadInit { + frames_played, + ctrl_msg_recv, + load_value_send, + } = render_thread_init; + let device = match &options.sink_id { None => host .default_output_device() @@ -106,37 +103,96 @@ impl AudioBackend for CpalBackend { log::info!("Output device: {:?}", device.name()); - let mut builder = StreamConfigsBuilder::new(device); + let supported = device + .default_output_config() + .expect("error while querying configs"); + let mut prefered: StreamConfig = supported.clone().into(); // set specific sample rate if requested if let Some(sample_rate) = options.sample_rate { - builder.with_sample_rate(sample_rate); + crate::assert_valid_sample_rate(sample_rate); + prefered.sample_rate.0 = sample_rate as u32; } // always try to set a decent buffer size - builder.with_latency_hint(options.latency_hint); + let buffer_size = super::buffer_size_for_latency_category( + options.latency_hint, + prefered.sample_rate.0 as f32, + ) as u32; - let configs = builder.build(); + let clamped_buffer_size: u32 = match supported.buffer_size() { + SupportedBufferSize::Unknown => buffer_size, + SupportedBufferSize::Range { min, max } => buffer_size.clamp(*min, *max), + }; + + prefered.buffer_size = cpal::BufferSize::Fixed(clamped_buffer_size); let output_latency = Arc::new(AtomicF64::new(0.)); - let streamer = OutputStreamer::new(configs, frames_played, output_latency.clone()) - .spawn() - .or_fallback() - .play(); + let number_of_channels = usize::from(prefered.channels); + let mut sample_rate = prefered.sample_rate.0 as f32; - let (stream, config, sender, cap_receiver) = streamer.get_output_stream(); - let number_of_channels = usize::from(config.channels); - let sample_rate = config.sample_rate.0 as f32; + let renderer = RenderThread::new( + sample_rate, + prefered.channels as usize, + ctrl_msg_recv.clone(), + frames_played.clone(), + Some(load_value_send.clone()), + ); - let backend = CpalBackend { + log::debug!( + "Attempt output stream with prefered config: {:?}", + &prefered + ); + let spawned = spawn_output_stream( + &device, + supported.sample_format(), + &prefered, + renderer, + output_latency.clone(), + ); + + let stream = match spawned { + Ok(stream) => { + log::debug!("Output stream set up successfully"); + stream + } + Err(e) => { + log::warn!("Output stream build failed with prefered config: {}", e); + sample_rate = supported.sample_rate().0 as f32; + let supported_config: StreamConfig = supported.clone().into(); + log::debug!( + "Attempt output stream with fallback config: {:?}", + &supported_config + ); + + let renderer = RenderThread::new( + sample_rate, + supported_config.channels as usize, + ctrl_msg_recv, + frames_played, + Some(load_value_send), + ); + + let spawned = spawn_output_stream( + &device, + supported.sample_format(), + &supported_config, + renderer, + output_latency.clone(), + ); + spawned.expect("OutputStream build failed with default config") + } + }; + + stream.play().expect("Stream refused to play"); + + CpalBackend { stream: ThreadSafeClosableStream::new(stream), output_latency, sample_rate, number_of_channels, sink_id: options.sink_id, - }; - - (backend, sender, cap_receiver) + } } fn build_input(options: AudioContextOptions) -> (Self, Receiver) @@ -252,7 +308,7 @@ impl AudioBackend for CpalBackend { self.sink_id.as_deref() } - fn boxed_clone(&self) -> Box { + fn boxed_clone(&self) -> Box { Box::new(self.clone()) } @@ -360,268 +416,3 @@ fn spawn_input_stream( } } } - -/// This struct helps to build `StreamConfigs` -struct StreamConfigsBuilder { - // This is not a dead code, this field is used by OutputStreamer - #[allow(dead_code)] - /// The device on which the stream will be build - device: cpal::Device, - /// the device supported config from wich all the other configs are derived - supported: cpal::SupportedStreamConfig, - /// the prefered config is a primary config optionnaly modified by the user options `AudioContextOptions` - prefered: cpal::StreamConfig, -} - -impl StreamConfigsBuilder { - /// creates the `StreamConfigBuilder` - fn new(device: cpal::Device) -> Self { - let supported = device - .default_output_config() - .expect("error while querying configs"); - - Self { - device, - supported: supported.clone(), - prefered: supported.into(), - } - } - - /// set preferred sample rate - fn with_sample_rate(&mut self, sample_rate: f32) { - crate::assert_valid_sample_rate(sample_rate); - self.prefered.sample_rate.0 = sample_rate as u32; - } - - /// define requested hardware buffer size - #[allow(clippy::needless_pass_by_value)] - fn with_latency_hint(&mut self, latency_hint: AudioContextLatencyCategory) { - let buffer_size = super::buffer_size_for_latency_category( - latency_hint, - self.prefered.sample_rate.0 as f32, - ) as u32; - - let clamped_buffer_size: u32 = match self.supported.buffer_size() { - SupportedBufferSize::Unknown => buffer_size, - SupportedBufferSize::Range { min, max } => buffer_size.clamp(*min, *max), - }; - - self.prefered.buffer_size = cpal::BufferSize::Fixed(clamped_buffer_size); - } - - /// builds `StreamConfigs` - fn build(self) -> StreamConfigs { - StreamConfigs::new(self) - } -} - -/// `StreamConfigs` contains configs data -/// required to build an output stream on -/// a prefered config or a fallback config in case of failure -struct StreamConfigs { - /// the requested sample format of the output stream - sample_format: cpal::SampleFormat, - /// the prefered config of the output stream - prefered: cpal::StreamConfig, - /// in case of failure to build the stream with `prefered` - /// a fallback config is used to spawn the stream - fallback: cpal::StreamConfig, -} - -impl StreamConfigs { - /// creates a stream configs with the data prepared by the builder - fn new(builder: StreamConfigsBuilder) -> Self { - let StreamConfigsBuilder { - supported, - prefered, - .. - } = builder; - - let sample_format = supported.sample_format(); - - Self { - sample_format, - prefered, - fallback: supported.into(), - } - } -} - -/// `OutputStreamer` is used to spawn an output stream -struct OutputStreamer { - /// The audio device on which the output stream is broadcast - device: cpal::Device, - /// The configs on which the output stream can be build - configs: StreamConfigs, - /// `frames_played` act as a time reference when processing - frames_played: Arc, - /// delay between render and actual system audio output - output_latency: Arc, - /// communication channel between control and render thread (sender part) - sender: Option>, - /// communication channel for render load values - cap_receiver: Option>, - /// the output stream - stream: Option, - /// a flag to know if the output stream has been build with prefered config - /// or fallback config - falled_back: bool, -} - -impl OutputStreamer { - /// creates an `OutputStreamer` - fn new( - configs: StreamConfigs, - frames_played: Arc, - output_latency: Arc, - ) -> Self { - let host = cpal::default_host(); - let device = host - .default_output_device() - .expect("no output device available"); - - Self { - device, - configs, - frames_played, - output_latency, - sender: None, - cap_receiver: None, - stream: None, - falled_back: false, - } - } - - /// spawns the output stram with prefered config - fn spawn(mut self) -> Result { - // try with prefered config - let config = &self.configs.prefered; - - // Creates the render thread - let sample_rate = config.sample_rate.0 as f32; - - // communication channel for ctrl msgs to the render thread - let (sender, receiver) = crossbeam_channel::unbounded(); - // communication channel for render load values - let (cap_sender, cap_receiver) = crossbeam_channel::bounded(1); - - self.sender = Some(sender); - self.cap_receiver = Some(cap_receiver); - - // spawn the render thread - let renderer = RenderThread::new( - sample_rate, - config.channels as usize, - receiver, - self.frames_played.clone(), - Some(cap_sender), - ); - - log::debug!("Attempt output stream with prefered config: {:?}", &config); - let spawned = spawn_output_stream( - &self.device, - self.configs.sample_format, - config, - renderer, - self.output_latency.clone(), - ); - - match spawned { - Ok(stream) => { - log::debug!("Output stream set up successfully"); - self.stream = Some(stream); - Ok(self) - } - Err(e) => { - log::warn!("Output stream build failed with prefered config: {}", e); - Err(self) - } - } - } - - /// playes the output stream - fn play(self) -> Self { - self.stream - .as_ref() - .expect("Stream needs to exist to be played") - .play() - .expect("Stream refused to play"); - self - } - - /// returns the output stream infos - fn get_output_stream( - self, - ) -> ( - Stream, - StreamConfig, - Sender, - Receiver, - ) { - if self.falled_back { - ( - self.stream.unwrap(), - self.configs.fallback, - self.sender.unwrap(), - self.cap_receiver.unwrap(), - ) - } else { - ( - self.stream.unwrap(), - self.configs.prefered, - self.sender.unwrap(), - self.cap_receiver.unwrap(), - ) - } - } -} - -/// adds a fallback path to `OutputStreamer` -trait OrFallback { - /// falls back if previous attempt failed - fn or_fallback(self) -> OutputStreamer; -} - -impl OrFallback for Result { - fn or_fallback(self) -> OutputStreamer { - match self { - Ok(streamer) => streamer, - Err(mut streamer) => { - // try with fallback config - streamer.falled_back = true; - let config = &streamer.configs.fallback; - - // Creates the renderer thread - let sample_rate = config.sample_rate.0 as f32; - - // communication channel to the render thread - let (sender, receiver) = crossbeam_channel::unbounded(); - // communication channel for render load values - let (cap_sender, cap_receiver) = crossbeam_channel::bounded(1); - - streamer.sender = Some(sender); - streamer.cap_receiver = Some(cap_receiver); - - // spawn the render thread - let renderer = RenderThread::new( - sample_rate, - config.channels as usize, - receiver, - streamer.frames_played.clone(), - Some(cap_sender), - ); - - let spawned = spawn_output_stream( - &streamer.device, - streamer.configs.sample_format, - config, - renderer, - streamer.output_latency.clone(), - ); - let stream = spawned.expect("OutputStream build failed with default config"); - streamer.stream = Some(stream); - streamer - } - } - } -} diff --git a/src/io/backend_cubeb.rs b/src/io/backend_cubeb.rs index e1c40e48..d59f802f 100644 --- a/src/io/backend_cubeb.rs +++ b/src/io/backend_cubeb.rs @@ -1,18 +1,16 @@ -use std::sync::atomic::AtomicU64; use std::sync::Arc; -use super::{AudioBackend, MediaDeviceInfo, MediaDeviceInfoKind}; +use super::{AudioBackendManager, MediaDeviceInfo, MediaDeviceInfoKind, RenderThreadInit}; use crate::buffer::AudioBuffer; use crate::context::AudioContextOptions; use crate::media::MicrophoneRender; -use crate::message::ControlMessage; use crate::render::RenderThread; -use crate::{AudioRenderCapacityLoad, RENDER_QUANTUM_SIZE}; +use crate::RENDER_QUANTUM_SIZE; use cubeb::{Context, DeviceId, DeviceType, StereoFrame, Stream, StreamParams}; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::Receiver; // erase type of `Frame` in cubeb `Stream` pub struct BoxedStream(Box); @@ -142,18 +140,17 @@ pub struct CubebBackend { sink_id: Option, } -impl AudioBackend for CubebBackend { - fn build_output( - options: AudioContextOptions, - frames_played: Arc, - ) -> ( - Self, - Sender, - Receiver, - ) +impl AudioBackendManager for CubebBackend { + fn build_output(options: AudioContextOptions, render_thread_init: RenderThreadInit) -> Self where Self: Sized, { + let RenderThreadInit { + frames_played, + ctrl_msg_recv, + load_value_send, + } = render_thread_init; + // Set up cubeb context let ctx = Context::init(None, None).unwrap(); @@ -175,15 +172,12 @@ impl AudioBackend for CubebBackend { _ => cubeb::ChannelLayout::UNDEFINED, // TODO, does this work? }; - // Set up render thread - let (sender, receiver) = crossbeam_channel::unbounded(); - let (cap_sender, cap_receiver) = crossbeam_channel::bounded(1); let renderer = RenderThread::new( sample_rate, number_of_channels, - receiver, + ctrl_msg_recv, frames_played, - Some(cap_sender), + Some(load_value_send), ); let params = cubeb::StreamParamsBuilder::new() @@ -254,7 +248,7 @@ impl AudioBackend for CubebBackend { backend.resume(); - (backend, sender, cap_receiver) + backend } fn build_input(options: AudioContextOptions) -> (Self, Receiver) @@ -363,7 +357,7 @@ impl AudioBackend for CubebBackend { self.sink_id.as_deref() } - fn boxed_clone(&self) -> Box { + fn boxed_clone(&self) -> Box { Box::new(self.clone()) } diff --git a/src/io/mod.rs b/src/io/mod.rs index 7bd8e6bb..32f37d16 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -34,24 +34,57 @@ pub fn enumerate_devices() -> Vec { panic!("No audio backend available, enable the 'cpal' or 'cubeb' feature") } +#[derive(Clone, Debug)] +pub(crate) struct RenderThreadInit { + pub frames_played: Arc, + pub ctrl_msg_recv: Receiver, + pub load_value_send: Sender, +} + +#[derive(Debug)] +pub(crate) struct ControlThreadInit { + pub frames_played: Arc, + pub ctrl_msg_send: Sender, + pub load_value_recv: Receiver, +} + +pub(crate) fn thread_init() -> (ControlThreadInit, RenderThreadInit) { + // track number of frames - synced from render thread to control thread + let frames_played = Arc::new(AtomicU64::new(0)); + // communication channel for ctrl msgs to the render thread + let (ctrl_msg_send, ctrl_msg_recv) = crossbeam_channel::unbounded(); + // communication channel for render load values + let (load_value_send, load_value_recv) = crossbeam_channel::bounded(1); + + let control_thread_init = ControlThreadInit { + frames_played: frames_played.clone(), + ctrl_msg_send, + load_value_recv, + }; + + let render_thread_init = RenderThreadInit { + frames_played, + ctrl_msg_recv, + load_value_send, + }; + + (control_thread_init, render_thread_init) +} + /// Set up an output stream (speakers) bases on the selected features (cubeb/cpal/none) pub(crate) fn build_output( options: AudioContextOptions, - frames_played: Arc, -) -> ( - Box, - Sender, - Receiver, -) { + render_thread_init: RenderThreadInit, +) -> Box { #[cfg(feature = "cubeb")] { - let (b, s, r) = backend_cubeb::CubebBackend::build_output(options, frames_played); - (Box::new(b), s, r) + let backend = backend_cubeb::CubebBackend::build_output(options, render_thread_init); + Box::new(backend) } #[cfg(all(not(feature = "cubeb"), feature = "cpal"))] { - let (b, s, r) = backend_cpal::CpalBackend::build_output(options, frames_played); - (Box::new(b), s, r) + let backend = backend_cpal::CpalBackend::build_output(options, render_thread_init); + Box::new(backend) } #[cfg(all(not(feature = "cubeb"), not(feature = "cpal")))] { @@ -63,7 +96,7 @@ pub(crate) fn build_output( #[cfg(any(feature = "cubeb", feature = "cpal"))] pub(crate) fn build_input( options: AudioContextOptions, -) -> (Box, Receiver) { +) -> (Box, Receiver) { #[cfg(feature = "cubeb")] { let (b, r) = backend_cubeb::CubebBackend::build_input(options); @@ -81,16 +114,9 @@ pub(crate) fn build_input( } /// Interface for audio backends -pub(crate) trait AudioBackend: Send + Sync + 'static { +pub(crate) trait AudioBackendManager: Send + Sync + 'static { /// Setup a new output stream (speakers) - fn build_output( - options: AudioContextOptions, - frames_played: Arc, - ) -> ( - Self, - Sender, - Receiver, - ) + fn build_output(options: AudioContextOptions, render_thread_ctor: RenderThreadInit) -> Self where Self: Sized; @@ -124,7 +150,7 @@ pub(crate) trait AudioBackend: Send + Sync + 'static { fn sink_id(&self) -> Option<&str>; /// Clone the stream reference - fn boxed_clone(&self) -> Box; + fn boxed_clone(&self) -> Box; fn enumerate_devices() -> Vec where diff --git a/src/media/mic.rs b/src/media/mic.rs index fef898e5..272c291d 100644 --- a/src/media/mic.rs +++ b/src/media/mic.rs @@ -9,7 +9,7 @@ use crate::context::AudioContextOptions; use crossbeam_channel::Sender; use crate::buffer::ChannelData; -use crate::io::{self, AudioBackend}; +use crate::io::{self, AudioBackendManager}; use crossbeam_channel::{Receiver, TryRecvError}; @@ -58,7 +58,7 @@ pub struct Microphone { receiver: Receiver, number_of_channels: usize, sample_rate: f32, - backend: Box, + backend: Box, } impl Microphone { @@ -138,7 +138,7 @@ pub struct MicrophoneStream { number_of_channels: usize, sample_rate: f32, - _stream: Box, + _stream: Box, } impl Iterator for MicrophoneStream { diff --git a/src/message.rs b/src/message.rs index 29ba4987..7dbbf0f3 100644 --- a/src/message.rs +++ b/src/message.rs @@ -2,6 +2,7 @@ use crate::node::ChannelConfig; use crate::param::AudioParamEvent; +use crate::render::graph::Graph; use crate::render::AudioProcessor; use crossbeam_channel::Sender; @@ -42,4 +43,10 @@ pub(crate) enum ControlMessage { /// Mark node as a cycle breaker (DelayNode only) MarkCycleBreaker { id: u64 }, + + /// Shut down and recycle the audio graph + Shutdown { sender: Sender }, + + /// Start rendering with given audio graph + Startup { graph: Graph }, } diff --git a/src/render/mod.rs b/src/render/mod.rs index 6067540a..9da9b790 100644 --- a/src/render/mod.rs +++ b/src/render/mod.rs @@ -6,7 +6,7 @@ use std::fmt::Debug; pub(crate) struct NodeIndex(pub u64); // private mods -mod graph; +pub(crate) mod graph; // pub(crate) mods mod thread; diff --git a/src/render/thread.rs b/src/render/thread.rs index 715db7be..96eb76ff 100644 --- a/src/render/thread.rs +++ b/src/render/thread.rs @@ -17,11 +17,11 @@ use super::graph::Graph; /// Operations running off the system-level audio callback pub(crate) struct RenderThread { - graph: Graph, + graph: Option, sample_rate: f32, number_of_channels: usize, frames_played: Arc, - receiver: Receiver, + receiver: Option>, buffer_offset: Option<(usize, AudioRenderQuantum)>, load_value_sender: Option>, } @@ -32,6 +32,8 @@ pub(crate) struct RenderThread { // we can neither move the RenderThread object into the render thread, nor can we initialize the // Rc's in that thread. #[allow(clippy::non_send_fields_in_send_ty)] +unsafe impl Send for Graph {} +unsafe impl Sync for Graph {} unsafe impl Send for RenderThread {} unsafe impl Sync for RenderThread {} @@ -44,18 +46,23 @@ impl RenderThread { load_value_sender: Option>, ) -> Self { Self { - graph: Graph::new(), + graph: None, sample_rate, number_of_channels, frames_played, - receiver, + receiver: Some(receiver), buffer_offset: None, load_value_sender, } } fn handle_control_messages(&mut self) { - for msg in self.receiver.try_iter() { + let receiver = match &self.receiver { + None => return, + Some(receiver) => receiver, + }; + + for msg in receiver.try_iter() { use ControlMessage::*; match msg { @@ -66,8 +73,13 @@ impl RenderThread { outputs, channel_config, } => { - self.graph - .add_node(NodeIndex(id), node, inputs, outputs, channel_config); + self.graph.as_mut().unwrap().add_node( + NodeIndex(id), + node, + inputs, + outputs, + channel_config, + ); } ConnectNode { from, @@ -76,22 +88,44 @@ impl RenderThread { input, } => { self.graph + .as_mut() + .unwrap() .add_edge((NodeIndex(from), output), (NodeIndex(to), input)); } DisconnectNode { from, to } => { - self.graph.remove_edge(NodeIndex(from), NodeIndex(to)); + self.graph + .as_mut() + .unwrap() + .remove_edge(NodeIndex(from), NodeIndex(to)); } DisconnectAll { from } => { - self.graph.remove_edges_from(NodeIndex(from)); + self.graph + .as_mut() + .unwrap() + .remove_edges_from(NodeIndex(from)); } FreeWhenFinished { id } => { - self.graph.mark_free_when_finished(NodeIndex(id)); + self.graph + .as_mut() + .unwrap() + .mark_free_when_finished(NodeIndex(id)); } AudioParamEvent { to, event } => { to.send(event).expect("Audioparam disappeared unexpectedly") } MarkCycleBreaker { id } => { - self.graph.mark_cycle_breaker(NodeIndex(id)); + self.graph + .as_mut() + .unwrap() + .mark_cycle_breaker(NodeIndex(id)); + } + Shutdown { sender } => { + let _ = sender.send(self.graph.take().unwrap()); + self.receiver = None; + return; // no further handling of ctrl msgs + } + Startup { graph } => { + self.graph = Some(graph); } } } @@ -127,7 +161,7 @@ impl RenderThread { }; // render audio graph - let rendered = self.graph.render(&scope); + let rendered = self.graph.as_mut().unwrap().render(&scope); buf.extend_alloc(&rendered); } @@ -188,14 +222,20 @@ impl RenderThread { buffer = next; } + // handle addition/removal of nodes/edges + self.handle_control_messages(); + + // if the thread is still booting, or shutting down, fill with silence + if self.graph.is_none() { + buffer.fill(crate::Sample::from(&0.)); + return; + } + // The audio graph is rendered in chunks of RENDER_QUANTUM_SIZE frames. But some audio backends // may not be able to emit chunks of this size. let chunk_size = RENDER_QUANTUM_SIZE * self.number_of_channels; for data in buffer.chunks_mut(chunk_size) { - // handle addition/removal of nodes/edges - self.handle_control_messages(); - // update time let current_frame = self .frames_played @@ -209,7 +249,7 @@ impl RenderThread { }; // render audio graph - let mut rendered = self.graph.render(&scope); + let mut rendered = self.graph.as_mut().unwrap().render(&scope); // online AudioContext allows channel count to be less than no of hardware channels if rendered.number_of_channels() != self.number_of_channels { @@ -232,6 +272,9 @@ impl RenderThread { debug_assert!(channel_offset < RENDER_QUANTUM_SIZE); self.buffer_offset = Some((channel_offset, rendered)); } + + // handle addition/removal of nodes/edges + self.handle_control_messages(); } } }