Skip to content

Commit

Permalink
BROKEN - gotta go rn
Browse files Browse the repository at this point in the history
I'll do a writeup later
  • Loading branch information
asonix committed Jul 29, 2017
1 parent 5af4360 commit a435bac
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 31 deletions.
1 change: 1 addition & 0 deletions authentication_background/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ license = "GPL-3.0"
[dependencies]
futures = "0.1"
futures-cpupool = "0.1"
tokio-core = "0.1"
60 changes: 29 additions & 31 deletions authentication_background/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

extern crate futures;
extern crate futures_cpupool;
extern crate tokio_core;

use futures::Future;
use futures::Stream;
use futures::stream;
use futures::future::{FutureResult, IntoFuture};
use futures_cpupool::{CpuPool, CpuFuture};
use tokio_core::reactor::Core;
use std::thread;
use std::sync::mpsc;

Expand Down Expand Up @@ -82,7 +84,6 @@ fn manager_thread<T>(
config: Config<'static, T>,
msg_sender: MsgSender<T>,
msg_receiver: MsgReceiver<T>,
fut_sender: FutSender,
) -> thread::JoinHandle<()>
where
T: Send + Sync + Clone,
Expand All @@ -91,34 +92,33 @@ where
let handlers = config.handlers();
let pool = CpuPool::new_num_cpus();

for msg in msg_receiver {
if msg.name() == EXIT_STR {
break;
}

let handler = match handlers.get(msg.name()) {
Some(handler) => handler,
None => {
println!("No handler for message '{}'", msg.name());
continue;
}
};

let cpu_future = future_thread(&pool, handler.clone(), msg, msg_sender.clone());
let messages = msg_receiver.iter().map(|msg| Ok(msg));
let server = stream::futures_unordered(messages)
.and_then(|msg| {
let handler = match handlers.get(msg.name()) {
Some(handler) => handler,
None => return Err(format!("No handler for message '{}'", msg.name())),
};

handler(msg.message()).map_err(|_| msg)
})
.or_else(|msg| if msg.retries() > 0 {
println!(
"Task for '{}' failed",
msg.name(),
);
msg_sender.send(msg.retry())?;
} else {
println!(
"Task for '{}' failed permanently",
msg.name(),
);
})
.for_each(|_| ());

fut_sender.send(cpu_future).expect("Failed to send future");
}
})
}
let mut core = Core::new().unwrap();

fn cleanup_thread(fut_receiver: FutReceiver) -> thread::JoinHandle<()> {
thread::spawn(move || {
for _ in stream::futures_unordered(fut_receiver)
.filter(|_| false)
.wait()
{
// do nothing
}
core.run(server).unwrap();
})
}

Expand All @@ -127,12 +127,10 @@ where
T: Send + Sync + Clone,
{
let (msg_sender, msg_receiver) = mpsc::channel::<Message<T>>();
let (fut_sender, fut_receiver) = mpsc::channel::<CpuFuture<(), Error>>();

let thread = manager_thread(config, msg_sender.clone(), msg_receiver, fut_sender);
let other_thread = cleanup_thread(fut_receiver);
let thread = manager_thread(config, msg_sender.clone(), msg_receiver);

Hooks::new(msg_sender, thread, other_thread)
Hooks::new(msg_sender, thread)
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions authentication_zmq/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.env
/target/
**/*.rs.bk
Cargo.lock
12 changes: 12 additions & 0 deletions authentication_zmq/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "authentication_zmq"
version = "0.1.0"
authors = ["Riley Trautman <[email protected]>"]

[dependencies]
authentication_backend = { path = "../authentication_backend" }
authentication_background = { path = "../authentication_background" }
zmq = "0.8"
futures = "0.1"
futures-cpupool = "0.1"
tokio-core = "0.1"
50 changes: 50 additions & 0 deletions authentication_zmq/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
extern crate zmq;
extern crate futures;
extern crate tokio_core;

use futures::{Stream, stream};
use tokio_core::reactor::Core;

pub struct ZmqResponder {
responder: zmq::Socket,
}

impl ZmqResponder {
fn connect(&self, bind_addr: &str) -> zmq::Result<()> {
self.responder.connect(bind_addr)
}
}

impl Iterator for ZmqResponder {
type Item = zmq::Result<Result<String, Vec<u8>>>;

fn next(&mut self) -> Option<Self::Item> {
Some(self.responder.recv_string(0))
}
}

pub fn run() {
let context = zmq::Context::new();
let responder = ZmqResponder { responder: context.socket(zmq::REP).unwrap() };
responder.connect("tcp://localhost:5560").expect(
"Failed connecting to responder",
);

let server = stream::futures_unordered(responder)
.map_err(|_| "some err")
.and_then(|msg| msg.map_err(|_| "some inner err"))
.for_each(|msg| {
println!("Got Message: {}", msg);

Ok(())
});

let mut core = Core::new().unwrap();
core.run(server).unwrap();
}

#[cfg(test)]
mod tests {
#[test]
fn it_works() {}
}
File renamed without changes.

0 comments on commit a435bac

Please sign in to comment.