Skip to content

Commit

Permalink
📝 add aio for linux (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
Xudong-Huang committed Mar 7, 2019
1 parent 14d22cb commit 147e59b
Show file tree
Hide file tree
Showing 5 changed files with 202 additions and 15 deletions.
54 changes: 39 additions & 15 deletions src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl File {
#[cold]
{
return Ok(File {
io: io_impl::IoData::new(0),
io: io_impl::IoData::new(&file),
sys: file,
ctx: io_impl::IoContext::new(),
offset: 0,
Expand All @@ -48,9 +48,10 @@ impl File {
if !is_coroutine() {
#[cold]
{
let file = StdFile::open(path)?;
return Ok(File {
io: io_impl::IoData::new(0),
sys: StdFile::open(path)?,
io: io_impl::IoData::new(&file),
sys: file,
ctx: io_impl::IoContext::new(),
offset: 0,
});
Expand All @@ -68,9 +69,10 @@ impl File {
if !is_coroutine() {
#[cold]
{
let file = options.open(path)?;
return Ok(File {
io: io_impl::IoData::new(0),
sys: options.open(path)?,
io: io_impl::IoData::new(&file),
sys: file,
ctx: io_impl::IoContext::new(),
offset: 0,
});
Expand All @@ -85,15 +87,16 @@ impl File {
if !is_coroutine() {
#[cold]
{
let file = StdFile::create(path)?;
return Ok(File {
io: io_impl::IoData::new(0),
sys: StdFile::create(path)?,
io: io_impl::IoData::new(&file),
sys: file,
ctx: io_impl::IoContext::new(),
offset: 0,
});
}
}

dbg!(path.as_ref());
let file = fs_impl::create(path)?;
File::from(file)
}
Expand Down Expand Up @@ -141,9 +144,13 @@ impl Read for File {

self.io.reset();

let reader = fs_impl::FileRead::new(&self.sys, self.offset, buf);
yield_with(&reader);
match reader.done() {
let ret = {
let reader = fs_impl::FileRead::new(self, self.offset, buf);
yield_with(&reader);
reader.done()
};

match ret {
Ok(len) => {
self.offset += len as u64;
Ok(len)
Expand All @@ -167,9 +174,12 @@ impl Write for File {

self.io.reset();

let writer = fs_impl::FileWrite::new(&self.sys, self.offset, buf);
yield_with(&writer);
let len = writer.done()?;
let len = {
let writer = fs_impl::FileWrite::new(self, self.offset, buf);
yield_with(&writer);
writer.done()?
};

self.offset += len as u64;
Ok(len)
}
Expand Down Expand Up @@ -201,6 +211,20 @@ impl Seek for File {
}
}

#[cfg(unix)]
impl io_impl::AsIoData for File {
fn as_io_data(&self) -> &io_impl::IoData {
&self.io
}
}

#[cfg(windows)]
impl ::std::os::windows::io::AsRawHandle for File {
fn as_raw_handle(&self) -> ::std::os::windows::io::RawHandle {
self.sys.as_raw_handle()
}
}

/// How large a buffer to pre-allocate before reading the entire file.
fn initial_buffer_size(file: &File) -> usize {
// Allocate one extra byte so the buffer doesn't need to grow before the
Expand Down Expand Up @@ -511,7 +535,7 @@ mod tests {
#[cfg(not(windows))]
fn file_try_clone() {
let tmpdir = tmpdir();
let ret = go!(move || {
go!(move || {
let mut f1 = check!(File::open_with_options(
OpenOptions::new().read(true).write(true).create(true),
&tmpdir.as_ref().join("test")
Expand Down
75 changes: 75 additions & 0 deletions src/io/sys/unix/fs/fs_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::io;
use std::ops::Deref;
use std::sync::atomic::Ordering;

use super::super::{co_io_result, from_nix_error, IoData};
use coroutine_impl::{co_cancel_data, CoroutineImpl, EventSource};
use io::AsIoData;
use nix::unistd::read;
use sync::delay_drop::DelayDrop;
use yield_now::yield_with;

pub struct FileRead<'a> {
io_data: &'a IoData,
buf: &'a mut [u8],
can_drop: DelayDrop,
}

impl<'a> FileRead<'a> {
pub fn new<T: AsIoData>(s: &'a T, offset: u64, buf: &'a mut [u8]) -> Self {
FileRead {
io_data: s.as_io_data(),
buf,
can_drop: DelayDrop::new(),
}
}

#[inline]
pub fn done(self) -> io::Result<usize> {
loop {
co_io_result()?;

// clear the io_flag
self.io_data.io_flag.store(false, Ordering::Relaxed);

// finish the read operation
match read(self.io_data.fd, self.buf).map_err(from_nix_error) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
ret => return ret,
}

if self.io_data.io_flag.swap(false, Ordering::Relaxed) {
continue;
}

// the result is still WouldBlock, need to try again
self.can_drop.reset();
yield_with(&self);
}
}
}

impl<'a> EventSource for FileRead<'a> {
fn subscribe(&mut self, co: CoroutineImpl) {
// when exit the scope the `can_drop` will be set to true
let _g = self.can_drop.delay_drop();

let cancel = co_cancel_data(&co);
// after register the coroutine, it's possible that other thread run it immediately
// and cause the process after it invalid, this is kind of user and kernel competition
// so we need to delay the drop of the EventSource, that's why _g is here
self.io_data.co.swap(co, Ordering::Release);

// there is event, re-run the coroutine
if self.io_data.io_flag.load(Ordering::Acquire) {
return self.io_data.schedule();
}

// register the cancel io data
cancel.set_io(self.io_data.deref().clone());
// re-check the cancel status
if cancel.is_canceled() {
unsafe { cancel.cancel() };
}
}
}
60 changes: 60 additions & 0 deletions src/io/sys/unix/fs/fs_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::io;
use std::sync::atomic::Ordering;

use super::super::{co_io_result, from_nix_error, IoData};
use coroutine_impl::{CoroutineImpl, EventSource};
use io::AsIoData;
use nix::unistd::write;
use sync::delay_drop::DelayDrop;
use yield_now::yield_with;

pub struct FileWrite<'a> {
io_data: &'a IoData,
buf: &'a [u8],
can_drop: DelayDrop,
}

impl<'a> FileWrite<'a> {
pub fn new<T: AsIoData>(s: &'a T, offset: u64, buf: &'a [u8]) -> Self {
FileWrite {
io_data: s.as_io_data(),
buf,
can_drop: DelayDrop::new(),
}
}

#[inline]
pub fn done(self) -> io::Result<usize> {
loop {
co_io_result()?;

// clear the io_flag
self.io_data.io_flag.store(false, Ordering::Relaxed);

match write(self.io_data.fd, self.buf).map_err(from_nix_error) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
ret => return ret,
}

if self.io_data.io_flag.swap(false, Ordering::Relaxed) {
continue;
}

// the result is still WouldBlock, need to try again
self.can_drop.reset();
yield_with(&self);
}
}
}

impl<'a> EventSource for FileWrite<'a> {
fn subscribe(&mut self, co: CoroutineImpl) {
let _g = self.can_drop.delay_drop();
self.io_data.co.swap(co, Ordering::Release);

// there is event, re-run the coroutine
if self.io_data.io_flag.load(Ordering::Acquire) {
self.io_data.schedule();
}
}
}
21 changes: 21 additions & 0 deletions src/io/sys/unix/fs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
mod fs_read;
mod fs_write;

pub use self::fs_read::FileRead;
pub use self::fs_write::FileWrite;

use std::fs::{File, OpenOptions};
use std::io;
use std::path::Path;

pub fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
File::open(path)
}

pub fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
File::create(path)
}

pub fn open_with_options<P: AsRef<Path>>(options: &mut OpenOptions, path: P) -> io::Result<File> {
options.open(path.as_ref())
}
7 changes: 7 additions & 0 deletions src/io/sys/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod select;

pub mod cancel;
pub mod co_io;
pub mod fs;
pub mod net;

use std::cell::RefCell;
Expand All @@ -39,6 +40,12 @@ pub fn add_socket<T: AsRawFd + ?Sized>(t: &T) -> io::Result<IoData> {
get_scheduler().get_selector().add_fd(IoData::new(t))
}

// register the socket to the system selector
#[inline]
pub fn add_file<T: AsRawFd + ?Sized>(t: &T) -> io::Result<IoData> {
add_socket(t)
}

#[inline]
fn del_socket(io: &IoData) {
// transfer the io to the selector
Expand Down

0 comments on commit 147e59b

Please sign in to comment.