From 7dec92dc39e8a07be5f617bca9bba729bbc8d8bf Mon Sep 17 00:00:00 2001 From: Kevin Walter Date: Tue, 13 Jan 2015 02:03:18 +0100 Subject: [PATCH] update to latest rustc --- Cargo.toml | 2 +- src/lib.rs | 92 ++++++++++++++++++++++++++++++------------------------ 2 files changed, 52 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2416a15..e310e25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "promise" -version = "0.0.3" +version = "0.0.4" authors = ["Kevin Walter "] repository = "https://github.com/lucidd/rust-promise" description = """ diff --git a/src/lib.rs b/src/lib.rs index 08935b0..99a04fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,18 @@ +#![feature(unboxed_closures)] extern crate test; use std::any::Any; use std::io::timer; use std::time::duration::Duration; -use std::task::try; -use std::comm::Select; +use std::sync::mpsc::{ + Select, + Sender, + SendError, + Receiver, + channel +}; use std::collections::HashMap; - +use std::thread::Thread; pub enum FutureError{ TaskFailure(Box), @@ -26,9 +32,9 @@ impl Promise { /// Completes the associated Future with value; pub fn resolve(self, value: T) -> Result<(), T> { - match self.sender.send_opt(Ok(value)) { + match self.sender.send(Ok(value)) { Ok(x) => Ok(x), - Err(Ok(val)) => Err(val), + Err(SendError(Ok(val))) => Err(val), _ => unreachable!(), } } @@ -38,7 +44,7 @@ impl Promise { } fn fail(self, error: FutureError) { - self.sender.send(Err(error)) + self.sender.send(Err(error)); } } @@ -57,7 +63,7 @@ impl Future{ pub fn first_of(futures: Vec>) -> Future { let (p, f) = promise::(); - spawn(move || { + Thread::spawn(move || { let select = Select::new(); let mut handles = HashMap::new(); for future in futures.iter() { @@ -72,7 +78,7 @@ impl Future{ { let first = handles.get_mut(&select.wait()).unwrap(); p.send( - match first.recv_opt() { + match first.recv() { Ok(res) => res, Err(_) => Err(FutureError::HungUp), } @@ -92,27 +98,27 @@ impl Future{ // and remove. It needs to be rewritten at some point. pub fn all(futures: Vec>) -> Future> { let (p, f) = promise::>(); - spawn(move || { + Thread::spawn(move || { let select = Select::new(); let mut handles = HashMap::new(); for (i, future) in futures.iter().enumerate() { let handle = select.handle(&future.receiver); let id = handle.id(); handles.insert(handle.id(), (i, handle)); - let &(_, ref mut handle) = handles.get_mut(&id).unwrap(); + let &mut (_, ref mut handle) = handles.get_mut(&id).unwrap(); unsafe { handle.add(); } } - let mut results: Vec> = Vec::from_fn(futures.len(), |_| None); + let mut results: Vec> = futures.iter().map(|_| None).collect(); let mut error: Option = None; for _ in range(0, futures.len()) { let id = select.wait(); { - let &(i, ref mut handle) = handles.get_mut(&id).unwrap(); - match handle.recv_opt() { + let &mut (i, ref mut handle) = handles.get_mut(&id).unwrap(); + match handle.recv() { Ok(Ok(value)) => { *results.get_mut(i).unwrap() = Some(value); }, @@ -132,7 +138,7 @@ impl Future{ handles.remove(&id); } - for (_, &(_, ref mut handle)) in handles.iter_mut() { + for (_, &mut (_, ref mut handle)) in handles.iter_mut() { unsafe { handle.remove(); } @@ -159,8 +165,9 @@ impl Future{ /// If func fails the failure is propagated through TaskFailure. pub fn from_fn + Send>(func: F) -> Future { let (p, f) = promise::(); - spawn(move || { - match try(func) { + Thread::spawn(move || { + let result = Thread::scoped(move || func()).join(); + match result { Ok(val) => { let _ = p.resolve(val); }, @@ -185,7 +192,8 @@ impl Future{ self.on_result(move |res| { match res { Ok(val) => { - match try(move || func(val)) { + let result = Thread::scoped(move || func(val)).join(); + match result { Ok(mapped) => { let _ = p.resolve(mapped); }, @@ -200,7 +208,7 @@ impl Future{ /// Synchronously waits for the result of the Future and returns it. pub fn get(self) -> Result { - match self.receiver.recv_opt() { + match self.receiver.recv() { Ok(res) => res, Err(_) => Err(FutureError::HungUp), } @@ -209,7 +217,7 @@ impl Future{ /// Registers a function f that is called with the result of the Future. /// This function does not block. pub fn on_result,), ()>+Send>(self, f: F) { - spawn(move || { + Thread::spawn(move || { let result = self.get(); f(result); }); @@ -218,7 +226,7 @@ impl Future{ /// Registers a function f that is called if the Future completes with a value. /// This function does not block. pub fn on_success+Send>(self, f: F) { - spawn(move || { + Thread::spawn(move || { match self.get() { Ok(value) => f(value), _ => (), @@ -229,7 +237,7 @@ impl Future{ /// Registers a function f that is called if the Future completes with an error. /// This function does not block. pub fn on_failure+Send>(self, f: F) { - spawn(move || { + Thread::spawn(move || { match self.get() { Err(err) => f(err), _ => () , @@ -240,7 +248,7 @@ impl Future{ /// Registers a function f that is called if the Future completes with a value. /// This function does not block. pub fn on_complete+Send, F: FnOnce<(FutureError,),()>+Send>(self, success: S, failure: F) { - spawn(move || { + Thread::spawn(move || { match self.get() { Ok(value) => success(value), Err(err) => failure(err), @@ -259,23 +267,25 @@ pub fn promise() -> (Promise, Future) { #[cfg(test)] mod tests { use super::{promise, Future, FutureError}; - use std::any::AnyRefExt; use std::boxed::BoxAny; use std::time::duration::Duration; use std::io::timer; - + use std::sync::mpsc::{ + channel + }; + use std::thread::Thread; #[test] fn test_future(){ let (p, f) = promise(); - assert_eq!(p.resolve(123u), Ok(())); - assert_eq!(f.get().ok(), Some(123u)); + assert_eq!(p.resolve(123us), Ok(())); + assert_eq!(f.get().ok(), Some(123us)); } #[test] fn test_future_hungup(){ - let (p, f) = promise::(); - spawn(move || { + let (p, f) = promise::(); + Thread::spawn(move || { timer::sleep(Duration::seconds(1)); p; }); @@ -287,15 +297,15 @@ mod tests { #[test] fn test_future_from_fn(){ - let f = Future::from_fn(move || 123u); - assert_eq!(f.get().ok(), Some(123u)); + let f = Future::from_fn(move || 123us); + assert_eq!(f.get().ok(), Some(123us)); } #[test] fn test_future_from_fn_fail(){ let f = Future::from_fn(move || { panic!("ooops"); - 123u + 123us }); let err = match f.get() { Err(FutureError::TaskFailure(err)) => err, @@ -307,9 +317,9 @@ mod tests { #[test] fn test_future_delay(){ - let f = Future::delay(move || 123u, Duration::seconds(3)); + let f = Future::delay(move || 123us, Duration::seconds(3)); //TODO: test delay - assert_eq!(f.get().ok(), Some(123u)); + assert_eq!(f.get().ok(), Some(123us)); } #[test] @@ -344,39 +354,39 @@ mod tests { #[test] fn test_future_value(){ - let f = Future::value(123u); - assert_eq!(f.get().ok(), Some(123u)); + let f = Future::value(123us); + assert_eq!(f.get().ok(), Some(123us)); } #[test] fn test_future_on_result(){ let (tx, rx) = channel(); - let f = Future::delay(move || 123u, Duration::seconds(1)); + let f = Future::delay(move || 123us, Duration::seconds(1)); f.on_result(move |x| { tx.send(x); }); - assert_eq!(rx.recv().ok(), Some(123u)) + assert_eq!(rx.recv().ok().unwrap().ok().unwrap(), 123us) } #[test] fn test_future_on_success(){ let (tx, rx) = channel(); - let f = Future::delay(move || 123u, Duration::seconds(1)); + let f = Future::delay(move || 123us, Duration::seconds(1)); f.on_success(move |x| { tx.send(x); }); - assert_eq!(rx.recv(), 123u) + assert_eq!(rx.recv().ok().unwrap(), 123us) } #[test] fn test_future_map(){ let (tx, rx) = channel(); - let f = Future::value(3u); + let f = Future::value(3us); f.map(move |x| x*x) .on_success(move |x| { tx.send(x); }); - assert_eq!(rx.recv(), 9u); + assert_eq!(rx.recv().ok().unwrap(), 9us); } }