Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drive audio directly using cpal, position sounds precisely within callbacks #36

Merged
merged 4 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/composer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ bincode = "1"
clap = { version = "4.2", features = ["derive"] }
color-eyre = "0.6"
composer_api = { path = "../composer_api" }
cpal = "0.15"
eyre = "0.6"
rodio = { version = "0.17", features = ["symphonia-wav"] }
10 changes: 4 additions & 6 deletions crates/composer/src/jukebox.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::sound::AudioOutput;
use eyre::{Context, Result};
use rodio::{
source::{Buffered, SamplesConverter},
Decoder, OutputStreamHandle, Source,
Decoder, Source,
};
use std::{collections::HashMap, fs::File, io::BufReader, path::Path};

Expand Down Expand Up @@ -48,15 +49,12 @@ impl Jukebox {
Ok(Self { samples })
}

pub(crate) fn play(&self, output_stream: &OutputStreamHandle, sample: Sample) -> Result<()> {
pub(crate) fn play(&self, audio_output: &AudioOutput, sample: Sample) {
let buffer = self
.samples
.get(&sample)
.expect("programmer error, all possible samples should be loaded");

output_stream
.play_raw(buffer.clone())
.with_context(|| format!("playing sample {sample:?}"))?;
Ok(())
audio_output.play(buffer.clone());
}
}
30 changes: 20 additions & 10 deletions crates/composer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
#![warn(clippy::all, clippy::clone_on_ref_ptr)]

use crate::jukebox::{Jukebox, Sample};
use crate::{
jukebox::{Jukebox, Sample},
sound::AudioOutput,
};
use clap::Parser;
use composer_api::{Event, DEFAULT_SERVER_ADDRESS};
use eyre::{Context, Result};
use rodio::{OutputStream, OutputStreamHandle};
use std::{
net::UdpSocket,
time::{Duration, Instant},
};

mod jukebox;
mod sound;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// the address to listen on for incoming events
address: Option<String>,

/// Delay event timestamps by this amount during playback. Should be larger than audio buffer
/// period time plus the sound card latency.
#[arg(short, long, default_value_t = 200)]
delay_ms: u64,
}

fn main() -> Result<()> {
Expand All @@ -27,13 +35,13 @@ fn main() -> Result<()> {
let socket = UdpSocket::bind(args.address.as_deref().unwrap_or(DEFAULT_SERVER_ADDRESS))?;
println!("Listening on {}", socket.local_addr()?);

let (_stream, stream_handle) = OutputStream::try_default()?;
let audio_output = AudioOutput::new(Duration::from_millis(args.delay_ms))?;

let jukebox = Jukebox::new().context("creating jukebox")?;
let mut stats = Stats { since: Instant::now(), events: 0, total_bytes: 0 };
loop {
match handle_datagram(&socket, &stream_handle, &jukebox) {
Ok(bytes_received) => stats.record_event(bytes_received),
match handle_datagram(&socket, &audio_output, &jukebox) {
Ok(bytes_received) => stats.record_event(bytes_received, &audio_output),
Err(err) => eprintln!("Could not process datagram. Ignoring and continuing. {:?}", err),
}
}
Expand All @@ -42,7 +50,7 @@ fn main() -> Result<()> {
/// Block until next datagram is received and handle it. Returns its size in bytes.
fn handle_datagram(
socket: &UdpSocket,
output_stream: &OutputStreamHandle,
audio_output: &AudioOutput,
jukebox: &Jukebox,
) -> Result<usize> {
// Size up to max normal network packet size
Expand All @@ -57,7 +65,7 @@ fn handle_datagram(
// TODO(Matej): add different sounds for these, and vary some their quality based on length.
Event::StderrWrite { length: _ } | Event::StdoutWrite { length: _ } => Sample::Click,
};
jukebox.play(output_stream, sample)?;
jukebox.play(audio_output, sample);

Ok(number_of_bytes)
}
Expand All @@ -71,15 +79,17 @@ struct Stats {
impl Stats {
const REPORT_EVERY: Duration = Duration::from_secs(1);

fn record_event(&mut self, bytes_received: usize) {
fn record_event(&mut self, bytes_received: usize, audio_output: &AudioOutput) {
self.events += 1;
self.total_bytes += bytes_received;

let elapsed = self.since.elapsed();
if elapsed >= Self::REPORT_EVERY {
println!(
"Received {} events ({} bytes) in last {elapsed:.2?}.",
self.events, self.total_bytes
"Received {} events ({} bytes) in last {elapsed:.2?}, {} too early plays.",
self.events,
self.total_bytes,
audio_output.fetch_too_early_plays(),
);

self.since = Instant::now();
Expand Down
176 changes: 176 additions & 0 deletions crates/composer/src/sound.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use cpal::{
traits::{DeviceTrait, HostTrait},
OutputCallbackInfo, OutputStreamTimestamp, SampleFormat, StreamInstant,
};
use eyre::{bail, eyre, Result};
use rodio::{
dynamic_mixer::{DynamicMixer, DynamicMixerController},
Source,
};
use std::{
sync::{
atomic::{AtomicU64, Ordering},
mpsc::{channel, Receiver, Sender, TryRecvError},
Arc,
},
time::{Duration, SystemTime, UNIX_EPOCH},
};

pub(crate) struct AudioOutput {
goodhoko marked this conversation as resolved.
Show resolved Hide resolved
source_tx: Sender<TimedSource>,
play_delay: Duration,
too_early_plays: Arc<AtomicU64>,
_stream: cpal::Stream,
}

impl AudioOutput {
goodhoko marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn new(play_delay: Duration) -> Result<Self> {
let cpal_device = cpal::default_host()
.default_output_device()
.ok_or_else(|| eyre!("no cpal audio output device found"))?;
let supported_config = cpal_device.default_output_config()?;
let stream_config = supported_config.config();
println!(
"Using audio device '{}', supported config {:?}, stream config {:?}.",
cpal_device.name()?,
supported_config,
stream_config,
);
if supported_config.sample_format() != SampleFormat::F32 {
bail!("Only F32 sample format supported for now.");
}

let (mixer_controller, mixer) =
rodio::dynamic_mixer::mixer::<f32>(stream_config.channels, stream_config.sample_rate.0);

let (source_tx, source_rx) = channel();

let too_early_plays = Arc::default();

// The mixer_controller can be shared between threads, but we want to precisely control
// when we add new sources w.r.t. the audio callback, so we move it to the audio thread and
// use a mpsc channel to send new sources to the audio thread.
let mut audio_callback =
AudioCallback::new(mixer_controller, mixer, source_rx, &too_early_plays);
let _stream = cpal_device.build_output_stream::<f32, _, _>(
&stream_config,
move |data_out, info| audio_callback.fill_data(data_out, info),
|err| eprintln!("Got cpal stream error callback: {err}."),
None,
)?;

Ok(Self { source_tx, play_delay, too_early_plays, _stream })
}

pub(crate) fn play<S: Source<Item = f32> + Send + 'static>(&self, source: S) {
// TODO(Matej): use timestamp from the event itself once we have it.
let play_at_timestamp = current_timestamp() + self.play_delay;

// TODO(Matej): we are in fact double-boxing because DynamicMixerController internally adds
// another box. But we need a sized type to send it through threads. We could make this
// method non-generic, but that would be less flexible, so just accept it for now.
let source = Box::new(source);

self.source_tx
.send(TimedSource { source, play_at_timestamp })
.expect("source receiver should be still alive");
}

/// Get "too early plays" counter since the last call of this method.
pub(crate) fn fetch_too_early_plays(&self) -> u64 {
self.too_early_plays.swap(0, Ordering::SeqCst)
}
}

/// An f32 [rodio::source::Source] with UNIX timestamp of desired play time attached.
struct TimedSource {
source: Box<dyn Source<Item = f32> + Send + 'static>,
play_at_timestamp: Duration,
}

/// A sort of manual implementation of the closure used as cpal audio data callback, for tidiness.
struct AudioCallback {
mixer_controller: Arc<DynamicMixerController<f32>>,
mixer: DynamicMixer<f32>,
source_rx: Receiver<TimedSource>,
too_early_plays: Arc<AtomicU64>,
stream_start: Option<StreamStart>,
}

impl AudioCallback {
fn new(
mixer_controller: Arc<DynamicMixerController<f32>>,
mixer: DynamicMixer<f32>,
source_rx: Receiver<TimedSource>,
too_early_plays: &Arc<AtomicU64>,
) -> Self {
let too_early_plays = Arc::clone(too_early_plays);
Self { mixer_controller, mixer, source_rx, too_early_plays, stream_start: None }
}

fn fill_data(&mut self, data_out: &mut [f32], info: &OutputCallbackInfo) {
let stream_timestamp = info.timestamp();
let stream_start =
self.stream_start.get_or_insert_with(|| StreamStart::new(stream_timestamp));

// At least on Linux ALSA, cpal gives very strange stream timestamp on very first call.
if stream_start.callback > stream_timestamp.callback {
eprintln!("cpal's stream timestamp jumped backwards, resetting stream start.");
*stream_start = StreamStart::new(stream_timestamp);
}

// UNIX timestamp of when the buffer filled during this call will be actually played.
let buffer_playback_timestamp = stream_start.realtime
+ (stream_timestamp.playback.duration_since(&stream_start.callback).expect(
"current playback timestamp should be larger than start callback timestamp",
));
Copy link
Member

@goodhoko goodhoko Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had hardtime understanding this timstamp-jutsu but after some sketches in a notebook it clicked and I was then able to simplify it a bit in #39

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice, merged in. @goodhoko I'd be curious to see your sketches!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hehe, I don't think it will be comprehensible though

IMG_7712

the "ball" is the sound we're trying to position, the line on right is the destination buffer with the question mark being the delay we need to compute. On the left there are the two independent epochs. The arrows are... well I guess just my thoughts as I was figuring it out .D


// Add possible new sources to the list
loop {
match self.source_rx.try_recv() {
Ok(timed_source) => {
let delay = timed_source
.play_at_timestamp
.checked_sub(buffer_playback_timestamp)
.unwrap_or_else(|| {
self.too_early_plays.fetch_add(1, Ordering::SeqCst);
Duration::ZERO
});
self.mixer_controller.add(timed_source.source.delay(delay));
},
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => panic!("source sender should be still alive"),
}
}

data_out.iter_mut().for_each(|d| *d = self.mixer.next().unwrap_or(0f32))
}
}

/// An utility struct that anchors stream's internal timestamp to real world UNIX timestamp.
struct StreamStart {
/// Timestamp of the start of stream _playback_ as the `Duration` since the UNIX epoch.
realtime: Duration,
/// Internal cpal's callback timestamp of stream start.
callback: StreamInstant,
}

impl StreamStart {
fn new(stream_timestamp: OutputStreamTimestamp) -> Self {
let realtime = current_timestamp();
println!(
"Audio stream starting at {realtime:?} UNIX timestamp, callback timestamp {:?}, \
playback timestamp delayed by {:?} after callback.",
stream_timestamp.callback,
stream_timestamp.playback.duration_since(&stream_timestamp.callback)
);

// We record _callback_ timestamp for the purposes of fixing playback in real time.
Self { realtime, callback: stream_timestamp.callback }
}
}

// Get current timestamp as the `Duration` since the UNIX epoch.
fn current_timestamp() -> Duration {
SystemTime::now().duration_since(UNIX_EPOCH).expect("Unable to get current UNIX time")
}