diff --git a/audioipc/build.rs b/audioipc/build.rs index f5dca97..10ef639 100644 --- a/audioipc/build.rs +++ b/audioipc/build.rs @@ -2,8 +2,6 @@ extern crate cc; fn main() { if std::env::var_os("CARGO_CFG_UNIX").is_some() { - cc::Build::new() - .file("src/cmsghdr.c") - .compile("cmsghdr"); + cc::Build::new().file("src/cmsghdr.c").compile("cmsghdr"); } } diff --git a/audioipc/src/async.rs b/audioipc/src/async.rs index 85e0f19..710af3d 100644 --- a/audioipc/src/async.rs +++ b/audioipc/src/async.rs @@ -12,11 +12,11 @@ use futures::Poll; #[cfg(unix)] use iovec::IoVec; #[cfg(unix)] +use mio::Ready; +#[cfg(unix)] use msg::{RecvMsg, SendMsg}; use std::io; use tokio_io::{AsyncRead, AsyncWrite}; -#[cfg(unix)] -use mio::Ready; pub trait AsyncRecvMsg: AsyncRead { /// Pull some bytes from this source into the specified `Buf`, returning @@ -65,7 +65,9 @@ impl AsyncRecvMsg for super::AsyncMessageStream { where B: BufMut, { - if let Async::NotReady = ::poll_read_ready(self, Ready::readable())? { + if let Async::NotReady = + ::poll_read_ready(self, Ready::readable())? + { return Ok(Async::NotReady); } let r = unsafe { diff --git a/audioipc/src/cmsg.rs b/audioipc/src/cmsg.rs index de98868..97ff81f 100644 --- a/audioipc/src/cmsg.rs +++ b/audioipc/src/cmsg.rs @@ -123,7 +123,7 @@ impl ControlMsgBuilder { slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::()) }; cmsg.put_slice(cmsghdr); - let mut cmsg = try!(align_buf(cmsg)); + let mut cmsg = align_buf(cmsg)?; cmsg.put_slice(msg); Ok(cmsg) diff --git a/audioipc/src/codec.rs b/audioipc/src/codec.rs index 6b60b09..a7e1825 100644 --- a/audioipc/src/codec.rs +++ b/audioipc/src/codec.rs @@ -31,7 +31,7 @@ pub trait Codec { /// A default method available to be called when there are no more bytes /// available to be read from the I/O. fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result { - match try!(self.decode(buf)) { + match self.decode(buf)? { Some(frame) => Ok(frame), None => Err(io::Error::new( io::ErrorKind::Other, @@ -101,10 +101,10 @@ impl LengthDelimitedCodec { let buf = buf.split_to(n).freeze(); trace!("Attempting to decode"); - let msg = try!(deserialize::(buf.as_ref()).map_err(|e| match *e { + let msg = deserialize::(buf.as_ref()).map_err(|e| match *e { bincode::ErrorKind::Io(e) => e, _ => io::Error::new(io::ErrorKind::Other, *e), - })); + })?; trace!("... Decoded {:?}", msg); Ok(Some(msg)) @@ -122,7 +122,7 @@ where fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { let n = match self.state { State::Length => { - match try!(self.decode_length(buf)) { + match self.decode_length(buf)? { Some(n) => { self.state = State::Data(n); @@ -138,7 +138,7 @@ where State::Data(n) => n, }; - match try!(self.decode_data(buf, n)) { + match self.decode_data(buf, n)? { Some(data) => { // Update the decode state self.state = State::Length; diff --git a/audioipc/src/core.rs b/audioipc/src/core.rs index 73f2724..dc9ec86 100644 --- a/audioipc/src/core.rs +++ b/audioipc/src/core.rs @@ -51,8 +51,9 @@ where let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let (remote_tx, remote_rx) = mpsc::channel::(); - let join = try!(thread::Builder::new().name(name.into()).spawn(move || { - let mut rt = current_thread::Runtime::new().expect("Failed to create current_thread::Runtime"); + let join = thread::Builder::new().name(name.into()).spawn(move || { + let mut rt = + current_thread::Runtime::new().expect("Failed to create current_thread::Runtime"); let handle = rt.handle(); drop(remote_tx.send(handle.clone())); @@ -63,12 +64,14 @@ where let _ = rt.block_on(shutdown_rx); trace!("thread shutdown..."); - })); + })?; - let handle = try!(remote_rx.recv().or_else(|_| Err(io::Error::new( - io::ErrorKind::Other, - "Failed to receive remote handle from spawned thread" - )))); + let handle = remote_rx.recv().or_else(|_| { + Err(io::Error::new( + io::ErrorKind::Other, + "Failed to receive remote handle from spawned thread", + )) + })?; Ok(CoreThread { inner: Some(Inner { diff --git a/audioipc/src/fd_passing.rs b/audioipc/src/fd_passing.rs index 10e8557..cd50cdc 100644 --- a/audioipc/src/fd_passing.rs +++ b/audioipc/src/fd_passing.rs @@ -103,7 +103,8 @@ where let fds = match frame.fds { Some(ref fds) => fds.clone(), None => Bytes::new(), - }.into_buf(); + } + .into_buf(); try_ready!(self.io.send_msg_buf(&mut msgs, &fds)) } _ => { @@ -179,14 +180,14 @@ where // readable again, at which point the stream is terminated. if self.is_readable { if self.eof { - let mut item = try!(self.codec.decode_eof(&mut self.read_buf)); + let mut item = self.codec.decode_eof(&mut self.read_buf)?; item.take_platform_handles(|| self.incoming_fds.take_fds()); return Ok(Some(item).into()); } trace!("attempting to decode a frame"); - if let Some(mut item) = try!(self.codec.decode(&mut self.read_buf)) { + if let Some(mut item) = self.codec.decode(&mut self.read_buf)? { trace!("frame decoded from buffer"); item.take_platform_handles(|| self.incoming_fds.take_fds()); return Ok(Some(item).into()); @@ -200,10 +201,9 @@ where // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF - let (n, _) = try_ready!( - self.io - .recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg()) - ); + let (n, _) = try_ready!(self + .io + .recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg())); if n == 0 { self.eof = true; @@ -230,14 +230,14 @@ where // then attempt to flush it. If after flush it's *still* // over BACKPRESSURE_THRESHOLD, then reject the send. if self.write_buf.len() > BACKPRESSURE_THRESHOLD { - try!(self.poll_complete()); + self.poll_complete()?; if self.write_buf.len() > BACKPRESSURE_THRESHOLD { return Ok(AsyncSink::NotReady(item)); } } let fds = item.platform_handles(); - try!(self.codec.encode(item, &mut self.write_buf)); + self.codec.encode(item, &mut self.write_buf)?; let fds = fds.and_then(|fds| { cmsg::builder(&mut self.outgoing_fds) .rights(&fds.0[..]) @@ -313,7 +313,7 @@ mod tests { use libc; use std; - extern { + extern "C" { fn cmsghdr_bytes(size: *mut libc::size_t) -> *const u8; } diff --git a/audioipc/src/frame.rs b/audioipc/src/frame.rs index 6a4fb4e..a58528c 100644 --- a/audioipc/src/frame.rs +++ b/audioipc/src/frame.rs @@ -79,13 +79,13 @@ where // readable again, at which point the stream is terminated. if self.is_readable { if self.eof { - let frame = try!(self.codec.decode_eof(&mut self.read_buf)); + let frame = self.codec.decode_eof(&mut self.read_buf)?; return Ok(Some(frame).into()); } trace!("attempting to decode a frame"); - if let Some(frame) = try!(self.codec.decode(&mut self.read_buf)) { + if let Some(frame) = self.codec.decode(&mut self.read_buf)? { trace!("frame decoded from buffer"); return Ok(Some(frame).into()); } @@ -120,13 +120,13 @@ where // then attempt to flush it. If after flush it's *still* // over BACKPRESSURE_THRESHOLD, then reject the send. if self.write_buf.len() > BACKPRESSURE_THRESHOLD { - try!(self.poll_complete()); + self.poll_complete()?; if self.write_buf.len() > BACKPRESSURE_THRESHOLD { return Ok(AsyncSink::NotReady(item)); } } - try!(self.codec.encode(item, &mut self.write_buf)); + self.codec.encode(item, &mut self.write_buf)?; Ok(AsyncSink::Ready) } diff --git a/audioipc/src/handle_passing.rs b/audioipc/src/handle_passing.rs index be39a32..cd98d5b 100644 --- a/audioipc/src/handle_passing.rs +++ b/audioipc/src/handle_passing.rs @@ -3,13 +3,13 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, BytesMut, IntoBuf}; use codec::Codec; use futures::{AsyncSink, Poll, Sink, StartSend, Stream}; use messages::AssocRawPlatformHandle; use std::collections::VecDeque; use std::{fmt, io}; +use tokio_io::{AsyncRead, AsyncWrite}; const INITIAL_CAPACITY: usize = 1024; const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY; @@ -117,13 +117,13 @@ where // readable again, at which point the stream is terminated. if self.is_readable { if self.eof { - let item = try!(self.codec.decode_eof(&mut self.read_buf)); + let item = self.codec.decode_eof(&mut self.read_buf)?; return Ok(Some(item).into()); } trace!("attempting to decode a frame"); - if let Some(item) = try!(self.codec.decode(&mut self.read_buf)) { + if let Some(item) = self.codec.decode(&mut self.read_buf)? { trace!("frame decoded from buffer"); return Ok(Some(item).into()); } @@ -136,10 +136,7 @@ where // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF - let n = try_ready!( - self.io - .read_buf(&mut self.read_buf) - ); + let n = try_ready!(self.io.read_buf(&mut self.read_buf)); if n == 0 { self.eof = true; @@ -159,14 +156,17 @@ where type SinkItem = C::In; type SinkError = io::Error; - fn start_send(&mut self, mut item: Self::SinkItem) -> StartSend { + fn start_send( + &mut self, + mut item: Self::SinkItem, + ) -> StartSend { trace!("start_send: item={:?}", item); // If the buffer is already over BACKPRESSURE_THRESHOLD, // then attempt to flush it. If after flush it's *still* // over BACKPRESSURE_THRESHOLD, then reject the send. if self.write_buf.len() > BACKPRESSURE_THRESHOLD { - try!(self.poll_complete()); + self.poll_complete()?; if self.write_buf.len() > BACKPRESSURE_THRESHOLD { return Ok(AsyncSink::NotReady(item)); } @@ -176,15 +176,21 @@ where if let Some((handles, target_pid)) = item.platform_handles() { got_handles = true; let remote_handles = unsafe { - [duplicate_platformhandle(handles[0], target_pid)?, - duplicate_platformhandle(handles[1], target_pid)?, - duplicate_platformhandle(handles[2], target_pid)?] + [ + duplicate_platformhandle(handles[0], target_pid)?, + duplicate_platformhandle(handles[1], target_pid)?, + duplicate_platformhandle(handles[2], target_pid)?, + ] }; - trace!("item handles: {:?} remote_handles: {:?}", handles, remote_handles); + trace!( + "item handles: {:?} remote_handles: {:?}", + handles, + remote_handles + ); item.take_platform_handles(|| Some(remote_handles)); } - try!(self.codec.encode(item, &mut self.write_buf)); + self.codec.encode(item, &mut self.write_buf)?; if got_handles { // Enforce splitting sends on messages that contain file @@ -224,33 +230,41 @@ pub fn framed_with_platformhandles(io: A, codec: C) -> FramedWithPlatformH } } -use winapi::um::{processthreadsapi, winnt, handleapi}; -use winapi::shared::minwindef::{DWORD, FALSE}; use super::PlatformHandleType; +use winapi::shared::minwindef::{DWORD, FALSE}; +use winapi::um::{handleapi, processthreadsapi, winnt}; // source_handle is effectively taken ownership of (consumed) and // closed when duplicate_platformhandle is called. // TODO: Make this transfer more explicit via the type system. -unsafe fn duplicate_platformhandle(source_handle: PlatformHandleType, - target_pid: DWORD) -> Result { +unsafe fn duplicate_platformhandle( + source_handle: PlatformHandleType, + target_pid: DWORD, +) -> Result { let source = processthreadsapi::GetCurrentProcess(); - let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, - FALSE, - target_pid); + let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, target_pid); if !super::valid_handle(target) { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "invalid target process")); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "invalid target process", + )); } let mut target_handle = std::ptr::null_mut(); - let ok = handleapi::DuplicateHandle(source, - source_handle, - target, - &mut target_handle, - 0, - FALSE, - winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS); + let ok = handleapi::DuplicateHandle( + source, + source_handle, + target, + &mut target_handle, + 0, + FALSE, + winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS, + ); if ok == FALSE { - return Err(std::io::Error::new(std::io::ErrorKind::Other, "DuplicateHandle failed")); + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "DuplicateHandle failed", + )); } Ok(target_handle) } diff --git a/audioipc/src/lib.rs b/audioipc/src/lib.rs index b929d89..b601452 100644 --- a/audioipc/src/lib.rs +++ b/audioipc/src/lib.rs @@ -3,7 +3,6 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details #![warn(unused_extern_crates)] - #![recursion_limit = "1024"] #[macro_use] extern crate error_chain; @@ -27,19 +26,19 @@ extern crate memmap; extern crate serde; #[macro_use] extern crate tokio_io; -extern crate tokio; -#[cfg(unix)] -extern crate tokio_reactor; -#[cfg(windows)] -extern crate winapi; #[cfg(unix)] extern crate mio; -#[cfg(unix)] -extern crate mio_uds; #[cfg(windows)] extern crate mio_named_pipes; +#[cfg(unix)] +extern crate mio_uds; +extern crate tokio; #[cfg(windows)] extern crate tokio_named_pipes; +#[cfg(unix)] +extern crate tokio_reactor; +#[cfg(windows)] +extern crate winapi; mod async; #[cfg(unix)] @@ -74,10 +73,10 @@ use std::path::PathBuf; // TODO: Remove hardcoded size and allow allocation based on cubeb backend requirements. pub const SHM_AREA_SIZE: usize = 2 * 1024 * 1024; -#[cfg(windows)] -use std::os::windows::io::{FromRawHandle, IntoRawHandle}; #[cfg(unix)] use std::os::unix::io::{FromRawFd, IntoRawFd}; +#[cfg(windows)] +use std::os::windows::io::{FromRawHandle, IntoRawHandle}; // This must match the definition of // ipc::FileDescriptor::PlatformHandleType in Gecko. diff --git a/audioipc/src/messages.rs b/audioipc/src/messages.rs index a2ff9de..fb8d802 100644 --- a/audioipc/src/messages.rs +++ b/audioipc/src/messages.rs @@ -3,12 +3,12 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use PlatformHandle; -use PlatformHandleType; use cubeb::{self, ffi}; use std::ffi::{CStr, CString}; use std::os::raw::{c_char, c_int, c_uint}; use std::ptr; +use PlatformHandle; +use PlatformHandleType; #[derive(Debug, Serialize, Deserialize)] pub struct Device { @@ -243,9 +243,11 @@ pub enum ClientMessage { #[derive(Debug, Deserialize, Serialize)] pub enum CallbackReq { - Data { nframes: isize, - input_frame_size: usize, - output_frame_size: usize }, + Data { + nframes: isize, + input_frame_size: usize, + output_frame_size: usize, + }, State(ffi::cubeb_state), DeviceChange, } @@ -274,7 +276,8 @@ pub trait AssocRawPlatformHandle { fn take_platform_handles(&mut self, f: F) where - F: FnOnce() -> Option<[PlatformHandleType; 3]> { + F: FnOnce() -> Option<[PlatformHandleType; 3]>, + { assert!(f().is_none()); } } @@ -284,14 +287,22 @@ impl AssocRawPlatformHandle for ServerMessage {} impl AssocRawPlatformHandle for ClientMessage { fn platform_handles(&self) -> Option<([PlatformHandleType; 3], u32)> { match *self { - ClientMessage::StreamCreated(ref data) => Some(([data.platform_handles[0].as_raw(), - data.platform_handles[1].as_raw(), - data.platform_handles[2].as_raw()], - data.target_pid)), - ClientMessage::ContextSetupDeviceCollectionCallback(ref data) => - Some(([data.platform_handles[0].as_raw(), - data.platform_handles[1].as_raw(), - data.platform_handles[2].as_raw()], data.target_pid)), + ClientMessage::StreamCreated(ref data) => Some(( + [ + data.platform_handles[0].as_raw(), + data.platform_handles[1].as_raw(), + data.platform_handles[2].as_raw(), + ], + data.target_pid, + )), + ClientMessage::ContextSetupDeviceCollectionCallback(ref data) => Some(( + [ + data.platform_handles[0].as_raw(), + data.platform_handles[1].as_raw(), + data.platform_handles[2].as_raw(), + ], + data.target_pid, + )), _ => None, } } @@ -302,16 +313,21 @@ impl AssocRawPlatformHandle for ClientMessage { { match *self { ClientMessage::StreamCreated(ref mut data) => { - let handles = f().expect("platform_handles must be available when processing StreamCreated"); - data.platform_handles = [PlatformHandle::new(handles[0]), - PlatformHandle::new(handles[1]), - PlatformHandle::new(handles[2])] + let handles = + f().expect("platform_handles must be available when processing StreamCreated"); + data.platform_handles = [ + PlatformHandle::new(handles[0]), + PlatformHandle::new(handles[1]), + PlatformHandle::new(handles[2]), + ] } ClientMessage::ContextSetupDeviceCollectionCallback(ref mut data) => { let handles = f().expect("platform_handles must be available when processing ContextSetupDeviceCollectionCallback"); - data.platform_handles = [PlatformHandle::new(handles[0]), - PlatformHandle::new(handles[1]), - PlatformHandle::new(handles[2])] + data.platform_handles = [ + PlatformHandle::new(handles[0]), + PlatformHandle::new(handles[1]), + PlatformHandle::new(handles[2]), + ] } _ => {} } diff --git a/audioipc/src/messagestream_unix.rs b/audioipc/src/messagestream_unix.rs index 8c77bad..624e66f 100644 --- a/audioipc/src/messagestream_unix.rs +++ b/audioipc/src/messagestream_unix.rs @@ -3,12 +3,12 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use std::os::unix::io::{IntoRawFd, FromRawFd, AsRawFd, RawFd}; +use super::tokio_uds_stream as tokio_uds; +use futures::Poll; +use mio::Ready; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::net; use tokio_io::{AsyncRead, AsyncWrite}; -use mio::Ready; -use futures::Poll; -use super::tokio_uds_stream as tokio_uds; #[derive(Debug)] pub struct MessageStream(net::UnixStream); @@ -19,7 +19,8 @@ impl MessageStream { MessageStream(stream) } - pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { + pub fn anonymous_ipc_pair( + ) -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { let pair = net::UnixStream::pair()?; Ok((MessageStream::new(pair.0), MessageStream::new(pair.1))) } @@ -28,8 +29,13 @@ impl MessageStream { MessageStream::new(net::UnixStream::from_raw_fd(raw)) } - pub fn into_tokio_ipc(self, handle: &tokio::reactor::Handle) -> std::result::Result { - Ok(AsyncMessageStream::new(tokio_uds::UnixStream::from_std(self.0, handle)?)) + pub fn into_tokio_ipc( + self, + handle: &tokio::reactor::Handle, + ) -> std::result::Result { + Ok(AsyncMessageStream::new(tokio_uds::UnixStream::from_std( + self.0, handle, + )?)) } } diff --git a/audioipc/src/messagestream_win.rs b/audioipc/src/messagestream_win.rs index e593755..e0159cd 100644 --- a/audioipc/src/messagestream_win.rs +++ b/audioipc/src/messagestream_win.rs @@ -3,11 +3,11 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details -use std::os::windows::io::{IntoRawHandle, FromRawHandle, AsRawHandle, RawHandle}; +use mio_named_pipes; +use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_named_pipes; -use mio_named_pipes; #[derive(Debug)] pub struct MessageStream(mio_named_pipes::NamedPipe); @@ -18,7 +18,8 @@ impl MessageStream { MessageStream(stream) } - pub fn anonymous_ipc_pair() -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { + pub fn anonymous_ipc_pair( + ) -> std::result::Result<(MessageStream, MessageStream), std::io::Error> { let pipe1 = mio_named_pipes::NamedPipe::new(get_pipe_name())?; let pipe2 = unsafe { mio_named_pipes::NamedPipe::from_raw_handle(pipe1.as_raw_handle()) }; Ok((MessageStream::new(pipe1), MessageStream::new(pipe2))) @@ -28,8 +29,13 @@ impl MessageStream { MessageStream::new(mio_named_pipes::NamedPipe::from_raw_handle(raw)) } - pub fn into_tokio_ipc(self, handle: &tokio::reactor::Handle) -> std::result::Result { - Ok(AsyncMessageStream::new(tokio_named_pipes::NamedPipe::from_pipe(self.0, handle)?)) + pub fn into_tokio_ipc( + self, + handle: &tokio::reactor::Handle, + ) -> std::result::Result { + Ok(AsyncMessageStream::new( + tokio_named_pipes::NamedPipe::from_pipe(self.0, handle)?, + )) } } diff --git a/audioipc/src/msg.rs b/audioipc/src/msg.rs index de7c074..18494ce 100644 --- a/audioipc/src/msg.rs +++ b/audioipc/src/msg.rs @@ -84,9 +84,7 @@ pub fn recv_msg_with_flags( msghdr.msg_control = control; msghdr.msg_controllen = controllen as _; - let n = try!(cvt_r(|| unsafe { - libc::recvmsg(socket, &mut msghdr as *mut _, flags) - })); + let n = cvt_r(|| unsafe { libc::recvmsg(socket, &mut msghdr as *mut _, flags) })?; let controllen = msghdr.msg_controllen as usize; Ok((n, controllen, msghdr.msg_flags)) diff --git a/audioipc/src/rpc/client/mod.rs b/audioipc/src/rpc/client/mod.rs index 1ad5765..2d802c4 100644 --- a/audioipc/src/rpc/client/mod.rs +++ b/audioipc/src/rpc/client/mod.rs @@ -51,9 +51,7 @@ mod proxy; pub use self::proxy::{ClientProxy, Response}; -pub fn bind_client( - transport: C::Transport, -) -> proxy::ClientProxy +pub fn bind_client(transport: C::Transport) -> proxy::ClientProxy where C: Client, { diff --git a/audioipc/src/rpc/driver.rs b/audioipc/src/rpc/driver.rs index 56e9b08..02f4b5d 100644 --- a/audioipc/src/rpc/driver.rs +++ b/audioipc/src/rpc/driver.rs @@ -43,8 +43,8 @@ where /// Process incoming messages off the transport. fn receive_incoming(&mut self) -> io::Result<()> { while self.run { - if let Async::Ready(req) = try!(self.handler.transport().poll()) { - try!(self.process_incoming(req)); + if let Async::Ready(req) = self.handler.transport().poll()? { + self.process_incoming(req)?; } else { break; } @@ -82,10 +82,10 @@ where fn send_outgoing(&mut self) -> io::Result<()> { trace!("send_responses"); loop { - match try!(self.handler.produce()) { + match self.handler.produce()? { Async::Ready(Some(message)) => { trace!(" --> got message"); - try!(self.process_outgoing(message)); + self.process_outgoing(message)?; } Async::Ready(None) => { trace!(" --> got None"); @@ -103,13 +103,13 @@ where fn process_outgoing(&mut self, message: T::Out) -> io::Result<()> { trace!("process_outgoing"); - try!(assert_send(&mut self.handler.transport(), message)); + assert_send(&mut self.handler.transport(), message)?; Ok(()) } fn flush(&mut self) -> io::Result<()> { - self.is_flushed = try!(self.handler.transport().poll_complete()).is_ready(); + self.is_flushed = self.handler.transport().poll_complete()?.is_ready(); // TODO: Ok(()) @@ -131,13 +131,13 @@ where trace!("rpc::Driver::tick"); // First read off data from the socket - try!(self.receive_incoming()); + self.receive_incoming()?; // Handle completed responses - try!(self.send_outgoing()); + self.send_outgoing()?; // Try flushing buffered writes - try!(self.flush()); + self.flush()?; if self.is_done() { trace!(" --> is done."); @@ -150,7 +150,7 @@ where } fn assert_send(s: &mut S, item: S::SinkItem) -> Result<(), S::SinkError> { - match try!(s.start_send(item)) { + match s.start_send(item)? { AsyncSink::Ready => Ok(()), AsyncSink::NotReady(_) => panic!( "sink reported itself as ready after `poll_ready` but was \ diff --git a/audioipc/src/shm.rs b/audioipc/src/shm.rs index 7cf4849..7291554 100644 --- a/audioipc/src/shm.rs +++ b/audioipc/src/shm.rs @@ -5,10 +5,10 @@ use errors::*; use memmap::{Mmap, MmapMut, MmapOptions}; +use std::cell::UnsafeCell; use std::fs::{remove_file, File, OpenOptions}; use std::path::Path; use std::sync::{atomic, Arc}; -use std::cell::UnsafeCell; pub struct SharedMemReader { mmap: Mmap, @@ -79,7 +79,6 @@ impl SharedMemSlice { mmap: self.mmap.clone(), } } - } unsafe impl Send for SharedMemSlice {} diff --git a/audioipc/src/tokio_uds_stream.rs b/audioipc/src/tokio_uds_stream.rs index 8e080ec..ef7543b 100644 --- a/audioipc/src/tokio_uds_stream.rs +++ b/audioipc/src/tokio_uds_stream.rs @@ -53,8 +53,7 @@ impl UnixStream { where P: AsRef, { - let res = mio_uds::UnixStream::connect(path) - .map(UnixStream::new); + let res = mio_uds::UnixStream::connect(path).map(UnixStream::new); let inner = match res { Ok(stream) => State::Waiting(stream), @@ -82,7 +81,7 @@ impl UnixStream { /// communicating back and forth between one another. Each socket will /// be associated with the default event loop's handle. pub fn pair() -> io::Result<(UnixStream, UnixStream)> { - let (a, b) = try!(mio_uds::UnixStream::pair()); + let (a, b) = mio_uds::UnixStream::pair()?; let a = UnixStream::new(a); let b = UnixStream::new(b); @@ -268,11 +267,11 @@ impl Future for ConnectFuture { match self.inner { State::Waiting(ref mut stream) => { if let Async::NotReady = stream.io.poll_write_ready()? { - return Ok(Async::NotReady) + return Ok(Async::NotReady); } - if let Some(e) = try!(stream.io.get_ref().take_error()) { - return Err(e) + if let Some(e) = stream.io.get_ref().take_error()? { + return Err(e); } } State::Error(_) => { @@ -281,8 +280,8 @@ impl Future for ConnectFuture { _ => unreachable!(), }; - return Err(e) - }, + return Err(e); + } State::Empty => panic!("can't poll stream twice"), } diff --git a/client/src/context.rs b/client/src/context.rs index 49213b9..068de55 100644 --- a/client/src/context.rs +++ b/client/src/context.rs @@ -9,7 +9,10 @@ use audioipc::codec::LengthDelimitedCodec; use audioipc::frame::{framed, Framed}; use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles}; use audioipc::{core, rpc}; -use audioipc::{messages, ClientMessage, ServerMessage, messages::DeviceCollectionReq, messages::DeviceCollectionResp}; +use audioipc::{ + messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage, + ServerMessage, +}; use cubeb_backend::{ ffi, Context, ContextOps, DeviceCollectionRef, DeviceId, DeviceType, Error, Ops, Result, Stream, StreamParams, StreamParamsRef, @@ -23,8 +26,8 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::{fmt, io, mem, ptr}; use stream; -use tokio::runtime::current_thread; use tokio::reactor; +use tokio::runtime::current_thread; use {ClientStream, CpuPoolInitParams, CPUPOOL_INIT_PARAMS, G_SERVER_FD}; cfg_if! { if #[cfg(target_os = "linux")] { @@ -37,7 +40,10 @@ struct CubebClient; impl rpc::Client for CubebClient { type Request = ServerMessage; type Response = ClientMessage; - type Transport = FramedWithPlatformHandles>; + type Transport = FramedWithPlatformHandles< + audioipc::AsyncMessageStream, + LengthDelimitedCodec, + >; } pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream); @@ -80,8 +86,10 @@ fn open_server_stream() -> io::Result { return Ok(audioipc::MessageStream::from_raw_fd(fd.as_raw())); } - Err(io::Error::new(io::ErrorKind::Other, - "Failed to get server connection.")) + Err(io::Error::new( + io::ErrorKind::Other, + "Failed to get server connection.", + )) } } @@ -145,13 +153,16 @@ impl rpc::Server for DeviceCollectionServer { type Request = DeviceCollectionReq; type Response = DeviceCollectionResp; type Future = CpuFuture; - type Transport = Framed>; + type Transport = + Framed>; fn process(&mut self, req: Self::Request) -> Self::Future { match req { DeviceCollectionReq::DeviceChange(device_type) => { - trace!("ctx_thread: DeviceChange Callback: device_type={}", - device_type); + trace!( + "ctx_thread: DeviceChange Callback: device_type={}", + device_type + ); let devtype = cubeb_backend::DeviceType::from_bits_truncate(device_type); @@ -166,9 +177,7 @@ impl rpc::Server for DeviceCollectionServer { self.cpu_pool.spawn_fn(move || { if devtype.contains(cubeb_backend::DeviceType::INPUT) { - unsafe { - input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) - } + unsafe { input_cb.unwrap()(ptr::null_mut(), input_user_ptr as *mut c_void) } } if devtype.contains(cubeb_backend::DeviceType::OUTPUT) { unsafe { @@ -211,7 +220,8 @@ impl ContextOps for ClientContext { open_server_stream() .and_then(|stream| stream.into_tokio_ipc(&handle)) .and_then(|stream| bind_and_send_client(stream, &tx_rpc)) - }).map_err(|_| Error::default())?; + }) + .map_err(|_| Error::default())?; let rpc = rx_rpc.recv().map_err(|_| Error::default())?; @@ -365,9 +375,8 @@ impl ContextOps for ClientContext { ContextSetupDeviceCollectionCallback => ContextSetupDeviceCollectionCallback())?; - let stream = unsafe { - audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].as_raw()) - }; + let stream = + unsafe { audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].as_raw()) }; // TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only // need one here. Drop the dummy handles the other side sent us to discard. @@ -383,14 +392,16 @@ impl ContextOps for ClientContext { }; let (wait_tx, wait_rx) = mpsc::channel(); - self.handle().spawn(futures::future::lazy(move || { - let handle = reactor::Handle::default(); - let stream = stream.into_tokio_ipc(&handle).unwrap(); - let transport = framed(stream, Default::default()); - rpc::bind_server(transport, server); - wait_tx.send(()).unwrap(); - Ok(()) - })).expect("Failed to spawn DeviceCollectionServer"); + self.handle() + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stream.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + rpc::bind_server(transport, server); + wait_tx.send(()).unwrap(); + Ok(()) + })) + .expect("Failed to spawn DeviceCollectionServer"); wait_rx.recv().unwrap(); self.device_collection_rpc = true; } diff --git a/client/src/lib.rs b/client/src/lib.rs index 704647f..78ae9fe 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -11,8 +11,8 @@ extern crate futures; extern crate futures_cpupool; #[macro_use] extern crate log; -extern crate tokio; extern crate audio_thread_priority; +extern crate tokio; #[macro_use] extern crate lazy_static; #[macro_use] @@ -23,14 +23,14 @@ mod send_recv; mod context; mod stream; -use audioipc::{PlatformHandleType, PlatformHandle}; +use audio_thread_priority::RtPriorityHandle; +use audioipc::{PlatformHandle, PlatformHandleType}; use context::ClientContext; use cubeb_backend::{capi, ffi}; +use futures_cpupool::CpuPool; use std::os::raw::{c_char, c_int}; +use std::sync::Mutex; use stream::ClientStream; -use std::sync::{Mutex}; -use futures_cpupool::CpuPool; -use audio_thread_priority::RtPriorityHandle; cfg_if! { if #[cfg(target_os = "linux")] { use std::sync::{Arc, Condvar}; diff --git a/client/src/stream.rs b/client/src/stream.rs index 1fb6611..c6efd5f 100644 --- a/client/src/stream.rs +++ b/client/src/stream.rs @@ -15,10 +15,10 @@ use std::ffi::CString; use std::os::raw::c_void; use std::ptr; use std::sync::mpsc; -use ClientContext; -use {assert_not_in_callback, set_in_callback}; use std::sync::{Arc, Mutex}; use tokio::reactor; +use ClientContext; +use {assert_not_in_callback, set_in_callback}; pub struct Device(ffi::cubeb_device); @@ -62,11 +62,16 @@ impl rpc::Server for CallbackServer { type Request = CallbackReq; type Response = CallbackResp; type Future = CpuFuture; - type Transport = Framed>; + type Transport = + Framed>; fn process(&mut self, req: Self::Request) -> Self::Future { match req { - CallbackReq::Data { nframes, input_frame_size, output_frame_size } => { + CallbackReq::Data { + nframes, + input_frame_size, + output_frame_size, + } => { trace!( "stream_thread: Data Callback: nframes={} input_fs={} output_fs={}", nframes, @@ -149,7 +154,6 @@ impl rpc::Server for CallbackServer { Ok(CallbackResp::DeviceChange) }) - } } } @@ -169,9 +173,12 @@ impl<'ctx> ClientStream<'ctx> { let has_output = init_params.output_stream_params.is_some(); let rpc = ctx.rpc(); - let data = try!(send_recv!(rpc, StreamInit(init_params) => StreamCreated())); + let data = send_recv!(rpc, StreamInit(init_params) => StreamCreated())?; - debug!("token = {}, handles = {:?}", data.token, data.platform_handles); + debug!( + "token = {}, handles = {:?}", + data.token, data.platform_handles + ); let stm = data.platform_handles[0]; let stream = unsafe { audioipc::MessageStream::from_raw_fd(stm.as_raw()) }; @@ -210,14 +217,16 @@ impl<'ctx> ClientStream<'ctx> { }; let (wait_tx, wait_rx) = mpsc::channel(); - ctx.handle().spawn(futures::future::lazy(move || { - let handle = reactor::Handle::default(); - let stream = stream.into_tokio_ipc(&handle).unwrap(); - let transport = framed(stream, Default::default()); - rpc::bind_server(transport, server); - wait_tx.send(()).unwrap(); - Ok(()) - })).expect("Failed to spawn CallbackServer"); + ctx.handle() + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stream.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + rpc::bind_server(transport, server); + wait_tx.send(()).unwrap(); + Ok(()) + })) + .expect("Failed to spawn CallbackServer"); wait_rx.recv().unwrap(); let stream = Box::into_raw(Box::new(ClientStream { @@ -322,13 +331,7 @@ pub fn init( state_callback: ffi::cubeb_state_callback, user_ptr: *mut c_void, ) -> Result { - let stm = try!(ClientStream::init( - ctx, - init_params, - data_callback, - state_callback, - user_ptr - )); + let stm = ClientStream::init(ctx, init_params, data_callback, state_callback, user_ptr)?; debug_assert_eq!(stm.user_ptr(), user_ptr); Ok(stm) } diff --git a/ipctest/src/client.rs b/ipctest/src/client.rs index f71abf5..cc6bd0c 100644 --- a/ipctest/src/client.rs +++ b/ipctest/src/client.rs @@ -184,7 +184,7 @@ pub fn client_test(fd: audioipc::PlatformHandleType) -> Result<()> { println!("Min Latency: {}", latency); println!("Preferred Rate: {}", rate); - try!(enumerate_devices(&ctx)); + enumerate_devices(&ctx)?; let params = cubeb::StreamParamsBuilder::new() .format(STREAM_FORMAT) @@ -213,7 +213,8 @@ pub fn client_test(fd: audioipc::PlatformHandleType) -> Result<()> { } output.len() as isize - }).state_callback(|state| println!("stream {:?}", state)); + }) + .state_callback(|state| println!("stream {:?}", state)); let stream = query!(builder.init(&ctx)); diff --git a/ipctest/src/main.rs b/ipctest/src/main.rs index 04bb2bf..04b1de5 100644 --- a/ipctest/src/main.rs +++ b/ipctest/src/main.rs @@ -3,7 +3,6 @@ // This program is made available under an ISC-style license. See the // accompanying file LICENSE for details. #![warn(unused_extern_crates)] - #![recursion_limit = "1024"] #[macro_use] extern crate error_chain; @@ -97,23 +96,23 @@ fn run() -> Result<()> { #[cfg(windows)] fn run_client(pid: u32, handle: usize) -> Result<()> { - use winapi::um::{processthreadsapi, winnt, handleapi, winnt::HANDLE}; use winapi::shared::minwindef::FALSE; + use winapi::um::{handleapi, processthreadsapi, winnt, winnt::HANDLE}; let mut target_handle = std::ptr::null_mut(); unsafe { - let source = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, - FALSE, - pid); + let source = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, pid); let target = processthreadsapi::GetCurrentProcess(); - let ok = handleapi::DuplicateHandle(source, - handle as HANDLE, - target, - &mut target_handle, - 0, - FALSE, - winnt::DUPLICATE_SAME_ACCESS); + let ok = handleapi::DuplicateHandle( + source, + handle as HANDLE, + target, + &mut target_handle, + 0, + FALSE, + winnt::DUPLICATE_SAME_ACCESS, + ); if ok == FALSE { bail!("DuplicateHandle failed"); } diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..cdf527d --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,2 @@ +use_try_shorthand = true + diff --git a/server/src/lib.rs b/server/src/lib.rs index 80b5de2..209330a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -10,15 +10,16 @@ extern crate error_chain; #[macro_use] extern crate log; +extern crate audio_thread_priority; extern crate audioipc; extern crate cubeb_core as cubeb; extern crate futures; extern crate slab; extern crate tokio; -extern crate audio_thread_priority; #[macro_use] extern crate lazy_static; +use audio_thread_priority::promote_current_thread_to_real_time; use audioipc::core; use audioipc::platformhandle_passing::framed_with_platformhandles; use audioipc::rpc; @@ -26,10 +27,9 @@ use audioipc::{MessageStream, PlatformHandle, PlatformHandleType}; use futures::sync::oneshot; use futures::Future; use std::error::Error; +use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::ptr; -use audio_thread_priority::promote_current_thread_to_real_time; -use std::ffi::{CStr, CString}; use std::sync::Mutex; use tokio::reactor; @@ -71,34 +71,31 @@ struct ServerWrapper { fn run() -> Result { trace!("Starting up cubeb audio server event loop thread..."); - let callback_thread = try!( - core::spawn_thread("AudioIPC Callback RPC", || { - match promote_current_thread_to_real_time(0, 48000) { - Ok(_) => { } - Err(_) => { - debug!("Failed to promote audio callback thread to real-time."); - } + let callback_thread = core::spawn_thread("AudioIPC Callback RPC", || { + match promote_current_thread_to_real_time(0, 48000) { + Ok(_) => {} + Err(_) => { + debug!("Failed to promote audio callback thread to real-time."); } - trace!("Starting up cubeb audio callback event loop thread..."); - Ok(()) - }).or_else(|e| { - debug!( - "Failed to start cubeb audio callback event loop thread: {:?}", - e.description() - ); - Err(e) - }) - ); - - let core_thread = try!( - core::spawn_thread("AudioIPC Server RPC", move || Ok(())).or_else(|e| { - debug!( - "Failed to cubeb audio core event loop thread: {:?}", - e.description() - ); - Err(e) - }) - ); + } + trace!("Starting up cubeb audio callback event loop thread..."); + Ok(()) + }) + .or_else(|e| { + debug!( + "Failed to start cubeb audio callback event loop thread: {:?}", + e.description() + ); + Err(e) + })?; + + let core_thread = core::spawn_thread("AudioIPC Server RPC", move || Ok(())).or_else(|e| { + debug!( + "Failed to cubeb audio core event loop thread: {:?}", + e.description() + ); + Err(e) + })?; Ok(ServerWrapper { core_thread, @@ -107,8 +104,10 @@ fn run() -> Result { } #[no_mangle] -pub unsafe extern "C" fn audioipc_server_start(context_name: *const std::os::raw::c_char, - backend_name: *const std::os::raw::c_char) -> *mut c_void { +pub unsafe extern "C" fn audioipc_server_start( + context_name: *const std::os::raw::c_char, + backend_name: *const std::os::raw::c_char, +) -> *mut c_void { let mut params = G_CUBEB_CONTEXT_PARAMS.lock().unwrap(); if !context_name.is_null() { params.context_name = CStr::from_ptr(context_name).to_owned(); @@ -137,10 +136,13 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy .and_then(|(sock1, sock2)| { // Spawn closure to run on same thread as reactor::Core // via remote handle. - wrapper.core_thread.handle().spawn(futures::future::lazy(|| { - trace!("Incoming connection"); - let handle = reactor::Handle::default(); - sock2.into_tokio_ipc(&handle) + wrapper + .core_thread + .handle() + .spawn(futures::future::lazy(|| { + trace!("Incoming connection"); + let handle = reactor::Handle::default(); + sock2.into_tokio_ipc(&handle) .and_then(|sock| { let transport = framed_with_platformhandles(sock, Default::default()); rpc::bind_server(transport, server::CubebServer::new(cb_remote)); @@ -148,12 +150,14 @@ pub extern "C" fn audioipc_server_new_client(p: *mut c_void) -> PlatformHandleTy }).map_err(|_| ()) // Notify waiting thread that sock2 has been registered. .and_then(|_| wait_tx.send(())) - })).expect("Failed to spawn CubebServer"); + })) + .expect("Failed to spawn CubebServer"); // Wait for notification that sock2 has been registered // with reactor::Core. let _ = wait_rx.wait(); Ok(PlatformHandle::from(sock1).as_raw()) - }).unwrap_or(-1isize as PlatformHandleType) + }) + .unwrap_or(-1isize as PlatformHandleType) } #[no_mangle] diff --git a/server/src/server.rs b/server/src/server.rs index 85765d2..12db67e 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -4,16 +4,17 @@ // accompanying file LICENSE for details use audioipc; -use audioipc::{MessageStream, PlatformHandle}; use audioipc::codec::LengthDelimitedCodec; -use audioipc::platformhandle_passing::FramedWithPlatformHandles; use audioipc::frame::{framed, Framed}; use audioipc::messages::{ CallbackReq, CallbackResp, ClientMessage, Device, DeviceCollectionReq, DeviceCollectionResp, - DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamInitParams, StreamParams, + DeviceInfo, RegisterDeviceCollectionChanged, ServerMessage, StreamCreate, StreamInitParams, + StreamParams, }; +use audioipc::platformhandle_passing::FramedWithPlatformHandles; use audioipc::rpc; use audioipc::shm::{SharedMemReader, SharedMemWriter}; +use audioipc::{MessageStream, PlatformHandle}; use cubeb; use cubeb::ffi; use futures::future::{self, FutureResult}; @@ -26,8 +27,8 @@ use std::ffi::CStr; use std::mem::{size_of, ManuallyDrop}; use std::os::raw::{c_long, c_void}; use std::{panic, slice}; -use tokio::runtime::current_thread; use tokio::reactor; +use tokio::runtime::current_thread; use errors::*; @@ -66,7 +67,8 @@ struct DeviceCollectionClient; impl rpc::Client for DeviceCollectionClient { type Request = DeviceCollectionReq; type Response = DeviceCollectionResp; - type Transport = Framed>; + type Transport = + Framed>; } struct CallbackClient; @@ -74,7 +76,8 @@ struct CallbackClient; impl rpc::Client for CallbackClient { type Request = CallbackReq; type Response = CallbackResp; - type Transport = Framed>; + type Transport = + Framed>; } struct ServerStreamCallbacks { @@ -92,7 +95,12 @@ struct ServerStreamCallbacks { impl ServerStreamCallbacks { fn data_callback(&mut self, input: &[u8], output: &mut [u8], nframes: isize) -> isize { - trace!("Stream data callback: {} {} {}", nframes, input.len(), output.len()); + trace!( + "Stream data callback: {} {} {}", + nframes, + input.len(), + output.len() + ); self.input_shm.write(input).unwrap(); @@ -102,7 +110,8 @@ impl ServerStreamCallbacks { nframes, input_frame_size: self.input_frame_size as usize, output_frame_size: self.output_frame_size as usize, - }).wait(); + }) + .wait(); match r { Ok(CallbackResp::Data(frames)) => { @@ -165,8 +174,14 @@ struct CubebServerCallbacks { impl CubebServerCallbacks { fn device_collection_changed_callback(&mut self, device_type: ffi::cubeb_device_type) { - debug!("Sending device collection ({:?}) changed event", device_type); - let _ = self.rpc.call(DeviceCollectionReq::DeviceChange(device_type)).wait(); + debug!( + "Sending device collection ({:?}) changed event", + device_type + ); + let _ = self + .rpc + .call(DeviceCollectionReq::DeviceChange(device_type)) + .wait(); } } @@ -181,7 +196,10 @@ impl rpc::Server for CubebServer { type Request = ServerMessage; type Response = ClientMessage; type Future = FutureResult; - type Transport = FramedWithPlatformHandles>; + type Transport = FramedWithPlatformHandles< + audioipc::AsyncMessageStream, + LengthDelimitedCodec, + >; fn process(&mut self, req: Self::Request) -> Self::Future { let resp = with_local_context(|context| match *context { @@ -216,8 +234,9 @@ impl CubebServer { ClientMessage::ClientDisconnected } - ServerMessage::ContextGetBackendId => - ClientMessage::ContextBackendId(context.backend_id().to_string()), + ServerMessage::ContextGetBackendId => { + ClientMessage::ContextBackendId(context.backend_id().to_string()) + } ServerMessage::ContextGetMaxChannelCount => context .max_channel_count() @@ -251,7 +270,8 @@ impl CubebServer { .map(|devices| { let v: Vec = devices.iter().map(|i| i.as_ref().into()).collect(); ClientMessage::ContextEnumeratedDevices(v) - }).unwrap_or_else(error), + }) + .unwrap_or_else(error), ServerMessage::StreamInit(ref params) => self .process_stream_init(context, params) @@ -310,7 +330,8 @@ impl CubebServer { .map(|device| ClientMessage::StreamCurrentDevice(Device::from(device))) .unwrap_or_else(error), - ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => self.streams[stm_tok] + ServerMessage::StreamRegisterDeviceChangeCallback(stm_tok, enable) => self.streams + [stm_tok] .stream .register_device_changed_callback(if enable { Some(device_change_cb_c) @@ -330,29 +351,30 @@ impl CubebServer { // done by spawning a future on cb_remote. let (tx, rx) = oneshot::channel(); - self.cb_remote.spawn(futures::future::lazy(move || { - let handle = reactor::Handle::default(); - let stream = stm2.into_tokio_ipc(&handle).unwrap(); - let transport = framed(stream, Default::default()); - let rpc = rpc::bind_client::(transport); - drop(tx.send(rpc)); - Ok(()) - })).expect("Failed to spawn DeviceCollectionClient"); + self.cb_remote + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stm2.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + let rpc = rpc::bind_client::(transport); + drop(tx.send(rpc)); + Ok(()) + })) + .expect("Failed to spawn DeviceCollectionClient"); // TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only // need one here. Send some dummy handles over for the other side to discard. - let (dummy1, dummy2) = MessageStream::anonymous_ipc_pair().expect("need dummy IPC pair"); + let (dummy1, dummy2) = + MessageStream::anonymous_ipc_pair().expect("need dummy IPC pair"); if let Ok(rpc) = rx.wait() { - self.cbs = Some(CubebServerCallbacks { - rpc, - }); + self.cbs = Some(CubebServerCallbacks { rpc }); let fds = RegisterDeviceCollectionChanged { platform_handles: [ PlatformHandle::from(stm1), PlatformHandle::from(dummy1), PlatformHandle::from(dummy2), ], - target_pid: self.remote_pid.unwrap() + target_pid: self.remote_pid.unwrap(), }; ClientMessage::ContextSetupDeviceCollectionCallback(fds) @@ -364,13 +386,15 @@ impl CubebServer { warn!("Failed to create RPC pair"); error(cubeb::Error::error()) } - }, + } - ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => - self.process_register_device_collection_changed(context, - cubeb::DeviceType::from_bits_truncate(device_type), - enable) - .unwrap_or_else(error), + ServerMessage::ContextRegisterDeviceCollectionChanged(device_type, enable) => self + .process_register_device_collection_changed( + context, + cubeb::DeviceType::from_bits_truncate(device_type), + enable, + ) + .unwrap_or_else(error), }; trace!("process_msg: req={:?}, resp={:?}", msg, resp); @@ -378,10 +402,12 @@ impl CubebServer { resp } - fn process_register_device_collection_changed(&mut self, - context: &cubeb::Context, - devtype: cubeb::DeviceType, - enable: bool) -> cubeb::Result { + fn process_register_device_collection_changed( + &mut self, + context: &cubeb::Context, + devtype: cubeb::DeviceType, + enable: bool, + ) -> cubeb::Result { if devtype == cubeb::DeviceType::UNKNOWN { return Err(cubeb::Error::invalid_parameter()); } @@ -396,7 +422,11 @@ impl CubebServer { None }; unsafe { - context.register_device_collection_changed(cubeb::DeviceType::INPUT, cb, user_ptr)?; + context.register_device_collection_changed( + cubeb::DeviceType::INPUT, + cb, + user_ptr, + )?; } } if devtype.contains(cubeb::DeviceType::OUTPUT) { @@ -406,7 +436,11 @@ impl CubebServer { None }; unsafe { - context.register_device_collection_changed(cubeb::DeviceType::OUTPUT, cb, user_ptr)?; + context.register_device_collection_changed( + cubeb::DeviceType::OUTPUT, + cb, + user_ptr, + )?; } } Ok(ClientMessage::ContextRegisteredDeviceCollectionChanged) @@ -432,7 +466,8 @@ impl CubebServer { }; let channel_count = p.channels as u16; sample_size * channel_count - }).unwrap_or(0u16) + }) + .unwrap_or(0u16) } // Create the callback handling struct which is attached the cubeb stream. @@ -452,14 +487,16 @@ impl CubebServer { // done by spawning a future on cb_remote. let (tx, rx) = oneshot::channel(); - self.cb_remote.spawn(futures::future::lazy(move || { - let handle = reactor::Handle::default(); - let stream = stm2.into_tokio_ipc(&handle).unwrap(); - let transport = framed(stream, Default::default()); - let rpc = rpc::bind_client::(transport); - drop(tx.send(rpc)); - Ok(()) - })).expect("Failed to spawn CallbackClient"); + self.cb_remote + .spawn(futures::future::lazy(move || { + let handle = reactor::Handle::default(); + let stream = stm2.into_tokio_ipc(&handle).unwrap(); + let transport = framed(stream, Default::default()); + let rpc = rpc::bind_client::(transport); + drop(tx.send(rpc)); + Ok(()) + })) + .expect("Failed to spawn CallbackClient"); let rpc: rpc::ClientProxy = match rx.wait() { Ok(rpc) => rpc, @@ -506,7 +543,8 @@ impl CubebServer { Some(data_cb_c), Some(state_cb_c), user_ptr, - ).and_then(|stream| { + ) + .and_then(|stream| { if !self.streams.has_available() { trace!( "server connection ran out of stream slots. reserving {} more.", @@ -523,7 +561,8 @@ impl CubebServer { .insert(ServerStream { stream: ManuallyDrop::new(stream), cbs: ManuallyDrop::new(cbs), - }).index() + }) + .index() } None => { // TODO: Turn into error @@ -538,9 +577,10 @@ impl CubebServer { PlatformHandle::from(input_file), PlatformHandle::from(output_file), ], - target_pid: self.remote_pid.unwrap() + target_pid: self.remote_pid.unwrap(), })) - }).map_err(|e| e.into()) + }) + .map_err(|e| e.into()) } } } @@ -585,9 +625,7 @@ unsafe extern "C" fn state_cb_c( ok.expect("State callback panicked"); } -unsafe extern "C" fn device_change_cb_c( - user_ptr: *mut c_void, -) { +unsafe extern "C" fn device_change_cb_c(user_ptr: *mut c_void) { let ok = panic::catch_unwind(|| { let cbs = &mut *(user_ptr as *mut ServerStreamCallbacks); cbs.device_change_callback();