Skip to content

Commit

Permalink
update to latest rustc
Browse files Browse the repository at this point in the history
  • Loading branch information
lucidd committed Jan 13, 2015
1 parent 20d167e commit 7dec92d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]

name = "promise"
version = "0.0.3"
version = "0.0.4"
authors = ["Kevin Walter <[email protected]>"]
repository = "https://github.com/lucidd/rust-promise"
description = """
Expand Down
92 changes: 51 additions & 41 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
#![feature(unboxed_closures)]
extern crate test;

use std::any::Any;
use std::io::timer;
use std::time::duration::Duration;
use std::task::try;
use std::comm::Select;
use std::sync::mpsc::{
Select,
Sender,
SendError,
Receiver,
channel
};
use std::collections::HashMap;

use std::thread::Thread;

pub enum FutureError{
TaskFailure(Box<Any+Send>),
Expand All @@ -26,9 +32,9 @@ impl<T: Send> Promise<T> {

/// Completes the associated Future with value;
pub fn resolve(self, value: T) -> Result<(), T> {
match self.sender.send_opt(Ok(value)) {
match self.sender.send(Ok(value)) {
Ok(x) => Ok(x),
Err(Ok(val)) => Err(val),
Err(SendError(Ok(val))) => Err(val),
_ => unreachable!(),
}
}
Expand All @@ -38,7 +44,7 @@ impl<T: Send> Promise<T> {
}

fn fail(self, error: FutureError) {
self.sender.send(Err(error))
self.sender.send(Err(error));
}

}
Expand All @@ -57,7 +63,7 @@ impl<T: Send> Future<T>{

pub fn first_of(futures: Vec<Future<T>>) -> Future<T> {
let (p, f) = promise::<T>();
spawn(move || {
Thread::spawn(move || {
let select = Select::new();
let mut handles = HashMap::new();
for future in futures.iter() {
Expand All @@ -72,7 +78,7 @@ impl<T: Send> Future<T>{
{
let first = handles.get_mut(&select.wait()).unwrap();
p.send(
match first.recv_opt() {
match first.recv() {
Ok(res) => res,
Err(_) => Err(FutureError::HungUp),
}
Expand All @@ -92,27 +98,27 @@ impl<T: Send> Future<T>{
// and remove. It needs to be rewritten at some point.
pub fn all(futures: Vec<Future<T>>) -> Future<Vec<T>> {
let (p, f) = promise::<Vec<T>>();
spawn(move || {
Thread::spawn(move || {
let select = Select::new();
let mut handles = HashMap::new();
for (i, future) in futures.iter().enumerate() {
let handle = select.handle(&future.receiver);
let id = handle.id();
handles.insert(handle.id(), (i, handle));
let &(_, ref mut handle) = handles.get_mut(&id).unwrap();
let &mut (_, ref mut handle) = handles.get_mut(&id).unwrap();
unsafe {
handle.add();
}
}

let mut results: Vec<Option<T>> = Vec::from_fn(futures.len(), |_| None);
let mut results: Vec<Option<T>> = futures.iter().map(|_| None).collect();
let mut error: Option<FutureError> = None;

for _ in range(0, futures.len()) {
let id = select.wait();
{
let &(i, ref mut handle) = handles.get_mut(&id).unwrap();
match handle.recv_opt() {
let &mut (i, ref mut handle) = handles.get_mut(&id).unwrap();
match handle.recv() {
Ok(Ok(value)) => {
*results.get_mut(i).unwrap() = Some(value);
},
Expand All @@ -132,7 +138,7 @@ impl<T: Send> Future<T>{
handles.remove(&id);
}

for (_, &(_, ref mut handle)) in handles.iter_mut() {
for (_, &mut (_, ref mut handle)) in handles.iter_mut() {
unsafe {
handle.remove();
}
Expand All @@ -159,8 +165,9 @@ impl<T: Send> Future<T>{
/// If func fails the failure is propagated through TaskFailure.
pub fn from_fn<F: FnOnce<(), T> + Send>(func: F) -> Future<T> {
let (p, f) = promise::<T>();
spawn(move || {
match try(func) {
Thread::spawn(move || {
let result = Thread::scoped(move || func()).join();
match result {
Ok(val) => {
let _ = p.resolve(val);
},
Expand All @@ -185,7 +192,8 @@ impl<T: Send> Future<T>{
self.on_result(move |res| {
match res {
Ok(val) => {
match try(move || func(val)) {
let result = Thread::scoped(move || func(val)).join();
match result {
Ok(mapped) => {
let _ = p.resolve(mapped);
},
Expand All @@ -200,7 +208,7 @@ impl<T: Send> Future<T>{

/// Synchronously waits for the result of the Future and returns it.
pub fn get(self) -> Result<T, FutureError> {
match self.receiver.recv_opt() {
match self.receiver.recv() {
Ok(res) => res,
Err(_) => Err(FutureError::HungUp),
}
Expand All @@ -209,7 +217,7 @@ impl<T: Send> Future<T>{
/// Registers a function f that is called with the result of the Future.
/// This function does not block.
pub fn on_result<F: FnOnce<(Result<T, FutureError>,), ()>+Send>(self, f: F) {
spawn(move || {
Thread::spawn(move || {
let result = self.get();
f(result);
});
Expand All @@ -218,7 +226,7 @@ impl<T: Send> Future<T>{
/// Registers a function f that is called if the Future completes with a value.
/// This function does not block.
pub fn on_success<F: FnOnce<(T,), ()>+Send>(self, f: F) {
spawn(move || {
Thread::spawn(move || {
match self.get() {
Ok(value) => f(value),
_ => (),
Expand All @@ -229,7 +237,7 @@ impl<T: Send> Future<T>{
/// Registers a function f that is called if the Future completes with an error.
/// This function does not block.
pub fn on_failure<F: FnOnce<(FutureError,), ()>+Send>(self, f: F) {
spawn(move || {
Thread::spawn(move || {
match self.get() {
Err(err) => f(err),
_ => () ,
Expand All @@ -240,7 +248,7 @@ impl<T: Send> Future<T>{
/// Registers a function f that is called if the Future completes with a value.
/// This function does not block.
pub fn on_complete<S: FnOnce<(T,),()>+Send, F: FnOnce<(FutureError,),()>+Send>(self, success: S, failure: F) {
spawn(move || {
Thread::spawn(move || {
match self.get() {
Ok(value) => success(value),
Err(err) => failure(err),
Expand All @@ -259,23 +267,25 @@ pub fn promise<T :Send>() -> (Promise<T>, Future<T>) {
#[cfg(test)]
mod tests {
use super::{promise, Future, FutureError};
use std::any::AnyRefExt;
use std::boxed::BoxAny;
use std::time::duration::Duration;
use std::io::timer;

use std::sync::mpsc::{
channel
};
use std::thread::Thread;

#[test]
fn test_future(){
let (p, f) = promise();
assert_eq!(p.resolve(123u), Ok(()));
assert_eq!(f.get().ok(), Some(123u));
assert_eq!(p.resolve(123us), Ok(()));
assert_eq!(f.get().ok(), Some(123us));
}

#[test]
fn test_future_hungup(){
let (p, f) = promise::<uint>();
spawn(move || {
let (p, f) = promise::<usize>();
Thread::spawn(move || {
timer::sleep(Duration::seconds(1));
p;
});
Expand All @@ -287,15 +297,15 @@ mod tests {

#[test]
fn test_future_from_fn(){
let f = Future::from_fn(move || 123u);
assert_eq!(f.get().ok(), Some(123u));
let f = Future::from_fn(move || 123us);
assert_eq!(f.get().ok(), Some(123us));
}

#[test]
fn test_future_from_fn_fail(){
let f = Future::from_fn(move || {
panic!("ooops");
123u
123us
});
let err = match f.get() {
Err(FutureError::TaskFailure(err)) => err,
Expand All @@ -307,9 +317,9 @@ mod tests {

#[test]
fn test_future_delay(){
let f = Future::delay(move || 123u, Duration::seconds(3));
let f = Future::delay(move || 123us, Duration::seconds(3));
//TODO: test delay
assert_eq!(f.get().ok(), Some(123u));
assert_eq!(f.get().ok(), Some(123us));
}

#[test]
Expand Down Expand Up @@ -344,39 +354,39 @@ mod tests {

#[test]
fn test_future_value(){
let f = Future::value(123u);
assert_eq!(f.get().ok(), Some(123u));
let f = Future::value(123us);
assert_eq!(f.get().ok(), Some(123us));
}

#[test]
fn test_future_on_result(){
let (tx, rx) = channel();
let f = Future::delay(move || 123u, Duration::seconds(1));
let f = Future::delay(move || 123us, Duration::seconds(1));
f.on_result(move |x| {
tx.send(x);
});
assert_eq!(rx.recv().ok(), Some(123u))
assert_eq!(rx.recv().ok().unwrap().ok().unwrap(), 123us)
}

#[test]
fn test_future_on_success(){
let (tx, rx) = channel();
let f = Future::delay(move || 123u, Duration::seconds(1));
let f = Future::delay(move || 123us, Duration::seconds(1));
f.on_success(move |x| {
tx.send(x);
});
assert_eq!(rx.recv(), 123u)
assert_eq!(rx.recv().ok().unwrap(), 123us)
}

#[test]
fn test_future_map(){
let (tx, rx) = channel();
let f = Future::value(3u);
let f = Future::value(3us);
f.map(move |x| x*x)
.on_success(move |x| {
tx.send(x);
});
assert_eq!(rx.recv(), 9u);
assert_eq!(rx.recv().ok().unwrap(), 9us);
}

}

0 comments on commit 7dec92d

Please sign in to comment.