Skip to content

Commit

Permalink
Check signal_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
quackzar committed Oct 1, 2024
1 parent a509dc2 commit ddaff6c
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 25 deletions.
2 changes: 2 additions & 0 deletions pycare/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pycare/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ name = "caring"
crate-type = ["cdylib"]

[dependencies]
futures-concurrency = "7.6.1"
pyo3 = { version = "0.22", features = ["abi3-py37", "generate-import-lib", "extension-module"]}
tokio = { version = "1.40.0", features = ["full"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
wecare = { path = "../wecare" }
82 changes: 63 additions & 19 deletions pycare/src/vm.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use std::sync::Mutex;
use std::{future::Future, ops::DerefMut, sync::Mutex, time::Duration};

use crate::expr::{Id, Opened};
use pyo3::{exceptions::PyValueError, prelude::*, types::PyList};
use wecare::vm;

#[pyclass(frozen)]
pub struct Engine(Mutex<vm::blocking::Engine>);
pub struct Engine(Mutex<EngineInner>);

struct EngineInner {
engine: vm::Engine,
runtime: tokio::runtime::Runtime,
}

#[pyclass(frozen)]
pub struct Computed(vm::Value<vm::UnknownNumber>);
Expand Down Expand Up @@ -36,6 +41,7 @@ impl Engine {
#[new]
#[pyo3(signature = (scheme, address, peers, multithreaded=false, threshold=None, preprocessed_path=None))]
fn new(
py: Python<'_>,
scheme: &str,
address: &str,
peers: &Bound<'_, PyList>,
Expand Down Expand Up @@ -69,41 +75,79 @@ impl Engine {
None => builder,
};

let builder = if multithreaded {
builder.multi_threaded_runtime()
} else {
builder.single_threaded_runtime()
};
let runtime = tokio::runtime::Runtime::new().unwrap();

let engine = builder
.connect_blocking()
.map_err(pyo3::exceptions::PyBrokenPipeError::new_err)?
.build()
.map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string()))?;
let engine = runtime.block_on(async {
check_signals(py, async {
builder
.connect()
.await
.map_err(pyo3::exceptions::PyBrokenPipeError::new_err)?
.build()
.map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string()))
})
.await
})??;

let engine = EngineInner { engine, runtime };
Ok(Self(Mutex::new(engine)))
}

/// Execute a script
///
/// * `script`: list of expressions to evaluate
fn execute(&self, script: &Opened) -> PyResult<Computed> {
fn execute(&self, py: Python<'_>, script: &Opened) -> PyResult<Computed> {
let res = {
let mut engine = self.0.lock().expect("Lock poisoned");
let mut this = self.0.lock().expect("Lock poisoned");
let script: vm::Opened = script.0.clone();
engine
.execute(script)
.map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string()))?
let EngineInner { engine, runtime } = this.deref_mut();
runtime.block_on(check_signals(py, async {
engine
.execute(script)
.await
.map_err(|e| pyo3::exceptions::PyValueError::new_err(e.to_string()))
}))??
};
Ok(Computed(res))
}

/// Your own Id
fn id(&self) -> Id {
Id(self.0.lock().unwrap().id())
Id(self.0.lock().unwrap().engine.id())
}

/// Your own Id
fn peers(&self) -> Vec<Id> {
self.0.lock().unwrap().peers().into_iter().map(Id).collect()
self.0
.lock()
.unwrap()
.engine
.peers()
.into_iter()
.map(Id)
.collect()
}
}


/// Check signals from python routinely while running other future
async fn check_signals<F: Future>(py: Python<'_>, f: F) -> Result<F::Output, PyErr> {
let signals = async {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
match py.check_signals() {
Ok(_) => continue,
Err(err) => break err,
}
}
};

tokio::select! {
err = signals => {
Err(err)
},
res = f => {
Ok(res)
},
}
}
8 changes: 2 additions & 6 deletions src/net/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,15 +587,11 @@ impl TcpNetwork {
let results = future::join_all(
peers
.iter()
.map(|addr| tokio::task::spawn(tokio::net::TcpStream::connect(*addr))),
.map(|addr| tokio::net::TcpStream::connect(*addr)),
)
.await;

let mut parties: Vec<_> = results
.into_iter()
.map(|x| x.unwrap())
.filter_map(|x| x.ok())
.collect();
let mut parties: Vec<_> = results.into_iter().filter_map(|x| x.ok()).collect();

// If we are not able to connect to some, they will connect to us.
// Accepting connections
Expand Down

0 comments on commit ddaff6c

Please sign in to comment.