diff --git a/crates/composer/Cargo.toml b/crates/composer/Cargo.toml index 09383d2..d415a28 100644 --- a/crates/composer/Cargo.toml +++ b/crates/composer/Cargo.toml @@ -8,5 +8,6 @@ bincode = "1" clap = { version = "4.2", features = ["derive"] } color-eyre = "0.6" composer_api = { path = "../composer_api" } +cpal = "0.15" eyre = "0.6" rodio = { version = "0.17", features = ["symphonia-wav"] } diff --git a/crates/composer/src/jukebox.rs b/crates/composer/src/jukebox.rs index e77d7ee..2d5e7dc 100644 --- a/crates/composer/src/jukebox.rs +++ b/crates/composer/src/jukebox.rs @@ -1,7 +1,8 @@ +use crate::sound::AudioOutput; use eyre::{Context, Result}; use rodio::{ source::{Buffered, SamplesConverter}, - Decoder, OutputStreamHandle, Source, + Decoder, Source, }; use std::{collections::HashMap, fs::File, io::BufReader, path::Path}; @@ -48,15 +49,12 @@ impl Jukebox { Ok(Self { samples }) } - pub(crate) fn play(&self, output_stream: &OutputStreamHandle, sample: Sample) -> Result<()> { + pub(crate) fn play(&self, audio_output: &AudioOutput, sample: Sample) { let buffer = self .samples .get(&sample) .expect("programmer error, all possible samples should be loaded"); - output_stream - .play_raw(buffer.clone()) - .with_context(|| format!("playing sample {sample:?}"))?; - Ok(()) + audio_output.play(buffer.clone()); } } diff --git a/crates/composer/src/main.rs b/crates/composer/src/main.rs index d362690..158489c 100644 --- a/crates/composer/src/main.rs +++ b/crates/composer/src/main.rs @@ -1,16 +1,19 @@ #![warn(clippy::all, clippy::clone_on_ref_ptr)] -use crate::jukebox::{Jukebox, Sample}; +use crate::{ + jukebox::{Jukebox, Sample}, + sound::AudioOutput, +}; use clap::Parser; use composer_api::{Event, DEFAULT_SERVER_ADDRESS}; use eyre::{Context, Result}; -use rodio::{OutputStream, OutputStreamHandle}; use std::{ net::UdpSocket, time::{Duration, Instant}, }; mod jukebox; +mod sound; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -27,12 +30,12 @@ fn main() -> Result<()> { let socket = UdpSocket::bind(args.address.as_deref().unwrap_or(DEFAULT_SERVER_ADDRESS))?; println!("Listening on {}", socket.local_addr()?); - let (_stream, stream_handle) = OutputStream::try_default()?; + let audio_output = AudioOutput::new()?; let jukebox = Jukebox::new().context("creating jukebox")?; let mut stats = Stats { since: Instant::now(), events: 0, total_bytes: 0 }; loop { - match handle_datagram(&socket, &stream_handle, &jukebox) { + match handle_datagram(&socket, &audio_output, &jukebox) { Ok(bytes_received) => stats.record_event(bytes_received), Err(err) => eprintln!("Could not process datagram. Ignoring and continuing. {:?}", err), } @@ -42,7 +45,7 @@ fn main() -> Result<()> { /// Block until next datagram is received and handle it. Returns its size in bytes. fn handle_datagram( socket: &UdpSocket, - output_stream: &OutputStreamHandle, + audio_output: &AudioOutput, jukebox: &Jukebox, ) -> Result { // Size up to max normal network packet size @@ -59,7 +62,7 @@ fn handle_datagram( // TODO(Pablo): Play a sound that scales with the number of reports. Event::LogStats(_) => todo!(), }; - jukebox.play(output_stream, sample)?; + jukebox.play(audio_output, sample); Ok(number_of_bytes) } diff --git a/crates/composer/src/sound.rs b/crates/composer/src/sound.rs new file mode 100644 index 0000000..b863e19 --- /dev/null +++ b/crates/composer/src/sound.rs @@ -0,0 +1,83 @@ +use cpal::{ + traits::{DeviceTrait, HostTrait}, + SampleFormat, +}; +use eyre::{bail, eyre, Result}; +use rodio::{dynamic_mixer::DynamicMixerController, Source}; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +pub(crate) struct AudioOutput { + mixer_controller: Arc>, + /// UNIX timestamp in nanoseconds, as an atomic variable. + last_callback_timestamp_ns: Arc, + _stream: cpal::Stream, +} + +impl AudioOutput { + pub(crate) fn new() -> Result { + let cpal_device = cpal::default_host() + .default_output_device() + .ok_or_else(|| eyre!("no cpal audio output device found"))?; + let supported_config = cpal_device.default_output_config()?; + let stream_config = supported_config.config(); + println!( + "Using audio device '{}', supported config {:?}, stream config {:?}.", + cpal_device.name()?, + supported_config, + stream_config, + ); + if supported_config.sample_format() != SampleFormat::F32 { + bail!("Only F32 sample format supported for now."); + } + + let (mixer_controller, mut mixer) = + rodio::dynamic_mixer::mixer::(stream_config.channels, stream_config.sample_rate.0); + + let last_callback_timestamp_ns = Arc::new(AtomicU64::new(0)); + + let last_callback_timestamp_ns_alias = Arc::clone(&last_callback_timestamp_ns); + let _stream = cpal_device.build_output_stream::( + &stream_config, + move |data_out, _info| { + // println!("data_callback: date_out.len(): {}, info: {_info:?}.", data_out.len()); + + // TODO(Matej): can we use timestamps in _info instead? + // TODO(Matej): specify less strict ordering (in all places for this atomic var) + last_callback_timestamp_ns_alias.store(current_timestamp_ns(), Ordering::SeqCst); + + data_out.iter_mut().for_each(|d| *d = mixer.next().unwrap_or(0f32)) + }, + |err| eprintln!("Got cpal stream error callback: {err}."), + None, + )?; + + Ok(Self { mixer_controller, last_callback_timestamp_ns, _stream }) + } + + pub(crate) fn play + Send + 'static>(&self, source: S) { + let ns_since_last_callback = current_timestamp_ns() + .saturating_sub(self.last_callback_timestamp_ns.load(Ordering::SeqCst)); + // println!("ns_since_last_callback: {ns_since_last_callback:>9}.",); + + // We assume that cpal's audio callbacks come at a steady rate, so we delay sounds to play + // by the time since last audio callback, so that it gets played at correct offset during + // the *next* callback. This is needed to accurately position sounds (sub buffer level). + // https://github.com/tonarino/acoustic_profiler/issues/19#issuecomment-1522348735 + self.mixer_controller.add(source.delay(Duration::from_nanos(ns_since_last_callback))); + } +} + +fn current_timestamp_ns() -> u64 { + let duration_since_epoch = + SystemTime::now().duration_since(UNIX_EPOCH).expect("Unable to get current UNIX time"); + duration_since_epoch + .as_nanos() + .try_into() + .expect("Cannot convert current UNIX timestamp in nanoseconds into u64") +}