Skip to content

Commit

Permalink
cargo fmt pass.
Browse files Browse the repository at this point in the history
  • Loading branch information
kinetiknz committed Jul 24, 2019
1 parent 0785b90 commit 7e128ed
Show file tree
Hide file tree
Showing 25 changed files with 386 additions and 290 deletions.
4 changes: 1 addition & 3 deletions audioipc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ extern crate cc;

fn main() {
if std::env::var_os("CARGO_CFG_UNIX").is_some() {
cc::Build::new()
.file("src/cmsghdr.c")
.compile("cmsghdr");
cc::Build::new().file("src/cmsghdr.c").compile("cmsghdr");
}
}
8 changes: 5 additions & 3 deletions audioipc/src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use futures::Poll;
#[cfg(unix)]
use iovec::IoVec;
#[cfg(unix)]
use mio::Ready;
#[cfg(unix)]
use msg::{RecvMsg, SendMsg};
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(unix)]
use mio::Ready;

pub trait AsyncRecvMsg: AsyncRead {
/// Pull some bytes from this source into the specified `Buf`, returning
Expand Down Expand Up @@ -65,7 +65,9 @@ impl AsyncRecvMsg for super::AsyncMessageStream {
where
B: BufMut,
{
if let Async::NotReady = <super::AsyncMessageStream>::poll_read_ready(self, Ready::readable())? {
if let Async::NotReady =
<super::AsyncMessageStream>::poll_read_ready(self, Ready::readable())?
{
return Ok(Async::NotReady);
}
let r = unsafe {
Expand Down
2 changes: 1 addition & 1 deletion audioipc/src/cmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl ControlMsgBuilder {
slice::from_raw_parts(&cmsghdr as *const _ as *const _, mem::size_of::<cmsghdr>())
};
cmsg.put_slice(cmsghdr);
let mut cmsg = try!(align_buf(cmsg));
let mut cmsg = align_buf(cmsg)?;
cmsg.put_slice(msg);

Ok(cmsg)
Expand Down
10 changes: 5 additions & 5 deletions audioipc/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub trait Codec {
/// A default method available to be called when there are no more bytes
/// available to be read from the I/O.
fn decode_eof(&mut self, buf: &mut BytesMut) -> io::Result<Self::Out> {
match try!(self.decode(buf)) {
match self.decode(buf)? {
Some(frame) => Ok(frame),
None => Err(io::Error::new(
io::ErrorKind::Other,
Expand Down Expand Up @@ -101,10 +101,10 @@ impl<In, Out> LengthDelimitedCodec<In, Out> {
let buf = buf.split_to(n).freeze();

trace!("Attempting to decode");
let msg = try!(deserialize::<Out>(buf.as_ref()).map_err(|e| match *e {
let msg = deserialize::<Out>(buf.as_ref()).map_err(|e| match *e {
bincode::ErrorKind::Io(e) => e,
_ => io::Error::new(io::ErrorKind::Other, *e),
}));
})?;

trace!("... Decoded {:?}", msg);
Ok(Some(msg))
Expand All @@ -122,7 +122,7 @@ where
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Out>> {
let n = match self.state {
State::Length => {
match try!(self.decode_length(buf)) {
match self.decode_length(buf)? {
Some(n) => {
self.state = State::Data(n);

Expand All @@ -138,7 +138,7 @@ where
State::Data(n) => n,
};

match try!(self.decode_data(buf, n)) {
match self.decode_data(buf, n)? {
Some(data) => {
// Update the decode state
self.state = State::Length;
Expand Down
17 changes: 10 additions & 7 deletions audioipc/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ where
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (remote_tx, remote_rx) = mpsc::channel::<current_thread::Handle>();

let join = try!(thread::Builder::new().name(name.into()).spawn(move || {
let mut rt = current_thread::Runtime::new().expect("Failed to create current_thread::Runtime");
let join = thread::Builder::new().name(name.into()).spawn(move || {
let mut rt =
current_thread::Runtime::new().expect("Failed to create current_thread::Runtime");
let handle = rt.handle();
drop(remote_tx.send(handle.clone()));

Expand All @@ -63,12 +64,14 @@ where

let _ = rt.block_on(shutdown_rx);
trace!("thread shutdown...");
}));
})?;

let handle = try!(remote_rx.recv().or_else(|_| Err(io::Error::new(
io::ErrorKind::Other,
"Failed to receive remote handle from spawned thread"
))));
let handle = remote_rx.recv().or_else(|_| {
Err(io::Error::new(
io::ErrorKind::Other,
"Failed to receive remote handle from spawned thread",
))
})?;

Ok(CoreThread {
inner: Some(Inner {
Expand Down
20 changes: 10 additions & 10 deletions audioipc/src/fd_passing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ where
let fds = match frame.fds {
Some(ref fds) => fds.clone(),
None => Bytes::new(),
}.into_buf();
}
.into_buf();
try_ready!(self.io.send_msg_buf(&mut msgs, &fds))
}
_ => {
Expand Down Expand Up @@ -179,14 +180,14 @@ where
// readable again, at which point the stream is terminated.
if self.is_readable {
if self.eof {
let mut item = try!(self.codec.decode_eof(&mut self.read_buf));
let mut item = self.codec.decode_eof(&mut self.read_buf)?;
item.take_platform_handles(|| self.incoming_fds.take_fds());
return Ok(Some(item).into());
}

trace!("attempting to decode a frame");

if let Some(mut item) = try!(self.codec.decode(&mut self.read_buf)) {
if let Some(mut item) = self.codec.decode(&mut self.read_buf)? {
trace!("frame decoded from buffer");
item.take_platform_handles(|| self.incoming_fds.take_fds());
return Ok(Some(item).into());
Expand All @@ -200,10 +201,9 @@ where
// Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
let (n, _) = try_ready!(
self.io
.recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg())
);
let (n, _) = try_ready!(self
.io
.recv_msg_buf(&mut self.read_buf, self.incoming_fds.cmsg()));

if n == 0 {
self.eof = true;
Expand All @@ -230,14 +230,14 @@ where
// then attempt to flush it. If after flush it's *still*
// over BACKPRESSURE_THRESHOLD, then reject the send.
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
try!(self.poll_complete());
self.poll_complete()?;
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
return Ok(AsyncSink::NotReady(item));
}
}

let fds = item.platform_handles();
try!(self.codec.encode(item, &mut self.write_buf));
self.codec.encode(item, &mut self.write_buf)?;
let fds = fds.and_then(|fds| {
cmsg::builder(&mut self.outgoing_fds)
.rights(&fds.0[..])
Expand Down Expand Up @@ -313,7 +313,7 @@ mod tests {
use libc;
use std;

extern {
extern "C" {
fn cmsghdr_bytes(size: *mut libc::size_t) -> *const u8;
}

Expand Down
8 changes: 4 additions & 4 deletions audioipc/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ where
// readable again, at which point the stream is terminated.
if self.is_readable {
if self.eof {
let frame = try!(self.codec.decode_eof(&mut self.read_buf));
let frame = self.codec.decode_eof(&mut self.read_buf)?;
return Ok(Some(frame).into());
}

trace!("attempting to decode a frame");

if let Some(frame) = try!(self.codec.decode(&mut self.read_buf)) {
if let Some(frame) = self.codec.decode(&mut self.read_buf)? {
trace!("frame decoded from buffer");
return Ok(Some(frame).into());
}
Expand Down Expand Up @@ -120,13 +120,13 @@ where
// then attempt to flush it. If after flush it's *still*
// over BACKPRESSURE_THRESHOLD, then reject the send.
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
try!(self.poll_complete());
self.poll_complete()?;
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
return Ok(AsyncSink::NotReady(item));
}
}

try!(self.codec.encode(item, &mut self.write_buf));
self.codec.encode(item, &mut self.write_buf)?;
Ok(AsyncSink::Ready)
}

Expand Down
74 changes: 44 additions & 30 deletions audioipc/src/handle_passing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
// This program is made available under an ISC-style license. See the
// accompanying file LICENSE for details

use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, BytesMut, IntoBuf};
use codec::Codec;
use futures::{AsyncSink, Poll, Sink, StartSend, Stream};
use messages::AssocRawPlatformHandle;
use std::collections::VecDeque;
use std::{fmt, io};
use tokio_io::{AsyncRead, AsyncWrite};

const INITIAL_CAPACITY: usize = 1024;
const BACKPRESSURE_THRESHOLD: usize = 4 * INITIAL_CAPACITY;
Expand Down Expand Up @@ -117,13 +117,13 @@ where
// readable again, at which point the stream is terminated.
if self.is_readable {
if self.eof {
let item = try!(self.codec.decode_eof(&mut self.read_buf));
let item = self.codec.decode_eof(&mut self.read_buf)?;
return Ok(Some(item).into());
}

trace!("attempting to decode a frame");

if let Some(item) = try!(self.codec.decode(&mut self.read_buf)) {
if let Some(item) = self.codec.decode(&mut self.read_buf)? {
trace!("frame decoded from buffer");
return Ok(Some(item).into());
}
Expand All @@ -136,10 +136,7 @@ where
// Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
let n = try_ready!(
self.io
.read_buf(&mut self.read_buf)
);
let n = try_ready!(self.io.read_buf(&mut self.read_buf));

if n == 0 {
self.eof = true;
Expand All @@ -159,14 +156,17 @@ where
type SinkItem = C::In;
type SinkError = io::Error;

fn start_send(&mut self, mut item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
fn start_send(
&mut self,
mut item: Self::SinkItem,
) -> StartSend<Self::SinkItem, Self::SinkError> {
trace!("start_send: item={:?}", item);

// If the buffer is already over BACKPRESSURE_THRESHOLD,
// then attempt to flush it. If after flush it's *still*
// over BACKPRESSURE_THRESHOLD, then reject the send.
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
try!(self.poll_complete());
self.poll_complete()?;
if self.write_buf.len() > BACKPRESSURE_THRESHOLD {
return Ok(AsyncSink::NotReady(item));
}
Expand All @@ -176,15 +176,21 @@ where
if let Some((handles, target_pid)) = item.platform_handles() {
got_handles = true;
let remote_handles = unsafe {
[duplicate_platformhandle(handles[0], target_pid)?,
duplicate_platformhandle(handles[1], target_pid)?,
duplicate_platformhandle(handles[2], target_pid)?]
[
duplicate_platformhandle(handles[0], target_pid)?,
duplicate_platformhandle(handles[1], target_pid)?,
duplicate_platformhandle(handles[2], target_pid)?,
]
};
trace!("item handles: {:?} remote_handles: {:?}", handles, remote_handles);
trace!(
"item handles: {:?} remote_handles: {:?}",
handles,
remote_handles
);
item.take_platform_handles(|| Some(remote_handles));
}

try!(self.codec.encode(item, &mut self.write_buf));
self.codec.encode(item, &mut self.write_buf)?;

if got_handles {
// Enforce splitting sends on messages that contain file
Expand Down Expand Up @@ -224,33 +230,41 @@ pub fn framed_with_platformhandles<A, C>(io: A, codec: C) -> FramedWithPlatformH
}
}

use winapi::um::{processthreadsapi, winnt, handleapi};
use winapi::shared::minwindef::{DWORD, FALSE};
use super::PlatformHandleType;
use winapi::shared::minwindef::{DWORD, FALSE};
use winapi::um::{handleapi, processthreadsapi, winnt};

// source_handle is effectively taken ownership of (consumed) and
// closed when duplicate_platformhandle is called.
// TODO: Make this transfer more explicit via the type system.
unsafe fn duplicate_platformhandle(source_handle: PlatformHandleType,
target_pid: DWORD) -> Result<PlatformHandleType, std::io::Error> {
unsafe fn duplicate_platformhandle(
source_handle: PlatformHandleType,
target_pid: DWORD,
) -> Result<PlatformHandleType, std::io::Error> {
let source = processthreadsapi::GetCurrentProcess();
let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE,
FALSE,
target_pid);
let target = processthreadsapi::OpenProcess(winnt::PROCESS_DUP_HANDLE, FALSE, target_pid);
if !super::valid_handle(target) {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "invalid target process"));
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"invalid target process",
));
}

let mut target_handle = std::ptr::null_mut();
let ok = handleapi::DuplicateHandle(source,
source_handle,
target,
&mut target_handle,
0,
FALSE,
winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS);
let ok = handleapi::DuplicateHandle(
source,
source_handle,
target,
&mut target_handle,
0,
FALSE,
winnt::DUPLICATE_CLOSE_SOURCE | winnt::DUPLICATE_SAME_ACCESS,
);
if ok == FALSE {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "DuplicateHandle failed"));
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"DuplicateHandle failed",
));
}
Ok(target_handle)
}
Loading

0 comments on commit 7e128ed

Please sign in to comment.