Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Actor::started(), stopped() fallible #89

Merged
merged 1 commit into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions benches/pub_sub.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Error;
use anyhow::{Error, Result};
use criterion::{criterion_group, criterion_main, Criterion};
use std::{
hint::black_box,
Expand Down Expand Up @@ -36,11 +36,7 @@ impl Actor for PublisherActor {
"PublisherActor"
}

fn handle(
&mut self,
context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, context: &mut Self::Context, message: Self::Message) -> Result<()> {
match message {
PublisherMessage::SubscriberStarted => {
self.subscriber_count = self.subscriber_count.checked_sub(1).unwrap();
Expand Down Expand Up @@ -83,18 +79,16 @@ impl Actor for SubscriberActor {
"SubscriberActor"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.subscribe::<StringEvent>();
for publisher_addr in self.publisher_addrs.iter() {
publisher_addr.send(PublisherMessage::SubscriberStarted).unwrap();
publisher_addr.send(PublisherMessage::SubscriberStarted)?;
}

Ok(())
}

fn handle(
&mut self,
_context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> {
// This black_box has a nice side effect that it silences the 'field is never read' warning.
black_box(message.0);
Ok(())
Expand Down
21 changes: 9 additions & 12 deletions examples/actor_wrapping.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Error;
use anyhow::{Error, Result};
use env_logger::Env;
use log::debug;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -34,12 +34,12 @@ impl<A: Actor> Actor for LoggingAdapter<A> {
A::priority(message)
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
debug!("LoggingAdapter: started()");
self.inner.started(context)
}

fn stopped(&mut self, context: &mut Self::Context) {
fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
debug!("LoggingAdapter: stopped()");
self.inner.stopped(context)
}
Expand All @@ -61,7 +61,7 @@ impl Actor for TestActor {
type Error = Error;
type Message = String;

fn handle(&mut self, context: &mut Self::Context, message: String) -> Result<(), Error> {
fn handle(&mut self, context: &mut Self::Context, message: String) -> Result<()> {
println!("Got a message: {}. Shuting down.", message);
context.system_handle.shutdown().map_err(Error::from)
}
Expand All @@ -70,20 +70,17 @@ impl Actor for TestActor {
"TestActor"
}

fn started(&mut self, context: &mut Self::Context) {
context.set_timeout(Some(Duration::from_millis(100)))
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.set_timeout(Some(Duration::from_millis(100)));
Ok(())
}

fn deadline_passed(
&mut self,
context: &mut Self::Context,
deadline: Instant,
) -> Result<(), Error> {
fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> {
context.myself.send(format!("deadline was {:?}", deadline)).map_err(Error::from)
}
}

fn main() -> Result<(), Error> {
fn main() -> Result<()> {
env_logger::Builder::from_env(Env::default().default_filter_or("debug")).init();

let mut system = System::new("Actor Wrapping Example");
Expand Down
29 changes: 10 additions & 19 deletions examples/pub_sub_example.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Error;
use anyhow::{Error, Result};
use env_logger::Env;
use std::time::{Duration, Instant};
use tonari_actor::{Actor, Context, Event, System};
Expand Down Expand Up @@ -40,9 +40,10 @@ impl Actor for PublisherActor {
"PublisherActor"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.set_deadline(Some(self.started_at + Duration::from_millis(1500)));
context.subscribe::<StringEvent>();
Ok(())
}

fn handle(
Expand Down Expand Up @@ -71,11 +72,7 @@ impl Actor for PublisherActor {
Ok(())
}

fn deadline_passed(
&mut self,
context: &mut Self::Context,
deadline: Instant,
) -> Result<(), Error> {
fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> {
context.myself.send(PublisherMessage::Periodic)?;
context.set_deadline(Some(deadline + Duration::from_secs(1)));
Ok(())
Expand Down Expand Up @@ -104,15 +101,12 @@ impl Actor for SubscriberActor1 {
"SubscriberActor1"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.subscribe::<StringEvent>();
Ok(())
}

fn handle(
&mut self,
_context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> {
match message {
SubscriberMessage::Text(text) => {
println!("SubscriberActor1 got a text message: {:?}", text);
Expand All @@ -132,15 +126,12 @@ impl Actor for SubscriberActor2 {
"SubscriberActor1"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.subscribe::<StringEvent>();
Ok(())
}

fn handle(
&mut self,
_context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> {
match message {
SubscriberMessage::Text(text) => {
println!("SubscriberActor2 got a text message: {:?}", text);
Expand Down
17 changes: 5 additions & 12 deletions examples/simple_timer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Error;
use anyhow::{Error, Result};
use env_logger::Env;
use std::time::{Duration, Instant};
use tonari_actor::{Actor, Context, System};
Expand Down Expand Up @@ -27,24 +27,17 @@ impl Actor for TimerExampleActor {
"TimerExampleActor"
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<()> {
context.set_deadline(Some(self.started_at + Duration::from_millis(1500)));
Ok(())
}

fn handle(
&mut self,
_context: &mut Self::Context,
message: Self::Message,
) -> Result<(), Self::Error> {
fn handle(&mut self, _context: &mut Self::Context, message: Self::Message) -> Result<()> {
println!("Got a message: {:?} at {:?}", message, self.started_at.elapsed());
Ok(())
}

fn deadline_passed(
&mut self,
context: &mut Self::Context,
deadline: Instant,
) -> Result<(), Error> {
fn deadline_passed(&mut self, context: &mut Self::Context, deadline: Instant) -> Result<()> {
context.myself.send(TimerMessage::Periodic)?;
context.set_deadline(Some(deadline + Duration::from_secs(1)));
Ok(())
Expand Down
45 changes: 29 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,10 @@ impl System {
.spawn(move || {
let mut actor = factory();

actor.started(&mut context);
if let Err(error) = actor.started(&mut context) {
Self::report_error_shutdown(&system_handle, A::name(), "started", error);
return;
}
debug!("[{}] started actor: {}", system_handle.name, A::name());

Self::run_actor_select_loop(actor, addr, &mut context, &system_handle);
Expand Down Expand Up @@ -535,9 +538,13 @@ impl System {

self.handle.registry.lock().push(RegistryEntry::CurrentThread(addr.control_tx.clone()));

actor.started(&mut context);
debug!("[{}] started actor: {}", system_handle.name, A::name());
Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
match actor.started(&mut context) {
Ok(()) => {
debug!("[{}] started actor: {}", system_handle.name, A::name());
Self::run_actor_select_loop(actor, addr, &mut context, system_handle);
},
Err(error) => Self::report_error_shutdown(system_handle, A::name(), "started", error),
}

// Wait for the system to shutdown before we exit, otherwise the process
// would exit before the system is completely shutdown
Expand Down Expand Up @@ -602,7 +609,10 @@ impl System {
// Process the event. Returning ends actor loop, the normal operation is to fall through.
match received {
Received::Control(Control::Stop) => {
actor.stopped(context);
if let Err(error) = actor.stopped(context) {
// FWIW this should always hit the "while shutting down" variant.
Self::report_error_shutdown(system_handle, A::name(), "stopped", error);
}
debug!("[{}] stopped actor: {}", system_handle.name, A::name());
return;
},
Expand Down Expand Up @@ -888,10 +898,14 @@ pub trait Actor {
}

/// An optional callback when the Actor has been started.
fn started(&mut self, _context: &mut Self::Context) {}
fn started(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
Ok(())
}

/// An optional callback when the Actor has been stopped.
fn stopped(&mut self, _context: &mut Self::Context) {}
fn stopped(&mut self, _context: &mut Self::Context) -> Result<(), Self::Error> {
Ok(())
}

/// An optional callback when a deadline has passed.
///
Expand Down Expand Up @@ -1132,16 +1146,17 @@ mod tests {

fn handle(&mut self, _: &mut Self::Context, message: usize) -> Result<(), String> {
println!("message: {}", message);

Ok(())
}

fn started(&mut self, _: &mut Self::Context) {
fn started(&mut self, _: &mut Self::Context) -> Result<(), String> {
println!("started");
Ok(())
}

fn stopped(&mut self, _: &mut Self::Context) {
fn stopped(&mut self, _: &mut Self::Context) -> Result<(), String> {
println!("stopped");
Ok(())
}
}

Expand Down Expand Up @@ -1196,8 +1211,8 @@ mod tests {
}

/// We just need this test to compile, not run.
fn started(&mut self, ctx: &mut Self::Context) {
ctx.system_handle.shutdown().unwrap();
fn started(&mut self, ctx: &mut Self::Context) -> Result<(), String> {
ctx.system_handle.shutdown().map_err(|e| e.to_string())
}
}

Expand Down Expand Up @@ -1380,10 +1395,8 @@ mod tests {
type Error = String;
type Message = ();

fn started(&mut self, context: &mut Self::Context) {
context
.subscribe_and_receive_latest::<Self::Message>()
.expect("can receive last cached value");
fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
context.subscribe_and_receive_latest::<Self::Message>().map_err(|e| e.to_string())
}

fn handle(
Expand Down
8 changes: 4 additions & 4 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor
}
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
self.inner.started(&mut TimedContext::from_context(context))
}

fn stopped(&mut self, context: &mut Self::Context) {
fn stopped(&mut self, context: &mut Self::Context) -> Result<(), Self::Error> {
self.inner.stopped(&mut TimedContext::from_context(context))
}

Expand Down Expand Up @@ -314,15 +314,15 @@ mod tests {
Ok(())
}

fn started(&mut self, context: &mut Self::Context) {
fn started(&mut self, context: &mut Self::Context) -> Result<(), String> {
context
.myself
.send_recurring(
|| 2,
Instant::now() + Duration::from_millis(50),
Duration::from_millis(100),
)
.unwrap()
.map_err(|e| e.to_string())
}
}

Expand Down
Loading