Skip to content

Commit

Permalink
fix: add traces to observe (#323)
Browse files Browse the repository at this point in the history
* fix: observe logging

* fix: add traces to observe

* test: fix
  • Loading branch information
cakekindel authored Apr 22, 2023
1 parent 3b99fa6 commit 8c7f023
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion toad/examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn start_server(addr: &'static str) {
.unwrap();

std::thread::spawn(|| loop {
SERVER.borrow().unwrap().steps().notify("time").unwrap();
SERVER.borrow().unwrap().notify("time").unwrap();
std::thread::sleep(Duration::from_millis(500));
});

Expand Down
13 changes: 13 additions & 0 deletions toad/src/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,19 @@ pub trait Platform<Steps>
res
}

/// Notify Observe subscribers that a new representation of the resource
/// at `path` is available
fn notify<P>(&self, path: P) -> Result<(), Self::Error>
where P: AsRef<str> + Clone
{
let mut effects = <Self::Types as PlatformTypes>::Effects::default();
self.steps()
.notify(path, &mut effects)
.map_err(Self::Error::step)?;

self.exec_many(effects).map_err(|(_, e)| e)
}

/// Poll for a response to a sent request, and pass it through `Steps`
/// for processing.
fn poll_resp(&self,
Expand Down
21 changes: 17 additions & 4 deletions toad/src/step/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,17 @@ macro_rules! exec_inner_step {
};
}

/// Issue an `Effect::Log`
#[macro_export]
macro_rules! log {
($at:path, $effs:expr, $lvl:expr, $($arg:tt)*) => {{
type S = $crate::todo::String::<1000>;
let msg = S::fmt(format_args!($($arg)*));
let msg = S::fmt(format_args!("[{}] {}", stringify!($at), msg.as_str()));
$effs.push(Effect::Log($lvl, msg));
}};
}

/// Specialized `?` operator for use in step bodies, allowing early-exit
/// for `Result`, `Option<Result>` and `Option<nb::Result>`.
#[macro_export]
Expand All @@ -292,7 +303,7 @@ macro_rules! _try {
}};
}

pub use {_try, exec_inner_step};
pub use {_try, exec_inner_step, log};

/// An error that can be returned by a [`Step`].
pub trait Error: core::fmt::Debug {}
Expand Down Expand Up @@ -343,10 +354,12 @@ pub trait Step<P: PlatformTypes>: Default {
/// there's a new version of the resource available.
///
/// See [`observe`] for more info.
fn notify<Path>(&self, path: Path) -> Result<(), Self::Error>
fn notify<Path>(&self, path: Path, effects: &mut P::Effects) -> Result<(), Self::Error>
where Path: AsRef<str> + Clone
{
self.inner().notify(path).map_err(Self::Error::from)
self.inner()
.notify(path, effects)
.map_err(Self::Error::from)
}

/// Invoked before messages are sent, allowing for internal state change & modification.
Expand Down Expand Up @@ -409,7 +422,7 @@ impl<P: PlatformTypes> Step<P> for () {
None
}

fn notify<Path>(&self, _: Path) -> Result<(), Self::Error>
fn notify<Path>(&self, _: Path, _: &mut P::Effects) -> Result<(), Self::Error>
where Path: AsRef<str>
{
Ok(())
Expand Down
105 changes: 91 additions & 14 deletions toad/src/step/observe.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use core::fmt::Debug;
use core::fmt::{Debug, Write};
use core::hash::{Hash, Hasher};
use core::marker::PhantomData;

Expand All @@ -11,11 +11,12 @@ use toad_msg::repeat::PATH;
use toad_msg::{CodeKind, Id, MessageOptions, Token};
use toad_stem::Stem;

use super::Step;
use super::{log, Step};
use crate::net::Addrd;
use crate::platform::{self, Effect, PlatformTypes};
use crate::req::Req;
use crate::resp::Resp;
use crate::todo::String;

/// Custom metadata options used to track messages created by this step.
///
Expand Down Expand Up @@ -258,6 +259,26 @@ impl<S, Subs, RequestQueue, Hasher> Observe<S, Subs, RequestQueue, Hasher> {
.map(|(ix, _)| ix)
}

fn fmt_subs<'a, P>(&self) -> String<1000>
where Subs: Array<Item = Sub<P>>,
P: PlatformTypes
{
self.subs.map_ref(|subs| {
let mut msg = String::<1000>::from("[");
subs.iter().enumerate().for_each(|(n, s)| {
write!(&mut msg,
"\"{:?} {:?}\"",
s.req.addr(),
s.req.data().msg().token).ok();
if n < subs.len() - 1 {
write!(&mut msg, ",").ok();
}
});
write!(&mut msg, "]").ok();
msg
})
}

fn similar_to<'a, P>(subs: &'a Subs,
addr: SocketAddr,
t: Token)
Expand Down Expand Up @@ -341,24 +362,33 @@ impl<S, Subs, RequestQueue, Hasher> Observe<S, Subs, RequestQueue, Hasher> {
})
}

// [1a]: Observe=1?
// [2a]: add to subs
// [3a]: pass request up to server
fn handle_incoming_request<P, E>(&self,
req: Addrd<Req<P>>,
_: &platform::Snapshot<P>,
_: &mut <P as PlatformTypes>::Effects)
effs: &mut <P as PlatformTypes>::Effects)
-> super::StepOutput<Addrd<Req<P>>, E>
where P: PlatformTypes,
Subs: Array<Item = Sub<P>>
{
match req.data().msg().observe() {
| Some(Register) => {
log!(Observe::handle_incoming_request,
effs,
log::Level::Trace,
"register: {:?} {:?}",
req.addr(),
req.data().msg().token);
let mut sub = Some(Sub::new(req.clone()));
self.subs
.map_mut(move |s| s.push(Option::take(&mut sub).unwrap()));
.map_mut(move |s| s.push(Option::take(&mut sub).expect("closure only invoked once")));
},
| Some(Deregister) => {
log!(Observe::handle_incoming_request,
effs,
log::Level::Trace,
"deregister: {:?} {:?}",
req.addr(),
req.data().msg().token);
self.subs
.map_mut(|s| match Self::get_index(s, req.data().msg().token) {
| Some(ix) => {
Expand All @@ -367,7 +397,14 @@ impl<S, Subs, RequestQueue, Hasher> Observe<S, Subs, RequestQueue, Hasher> {
| None => (),
})
},
| _ => (),
| _ => {
log!(Observe::handle_incoming_request,
effs,
log::Level::Trace,
"ignoring: {:?} {:?}",
req.addr(),
req.data().msg().token);
},
};

Some(Ok(req))
Expand Down Expand Up @@ -437,16 +474,36 @@ impl<P, S, B, RQ, H> Step<P> for Observe<S, B, RQ, H>
self.inner.poll_resp(snap, effects, token, addr)
}

fn notify<Path>(&self, path: Path) -> Result<(), Self::Error>
fn notify<Path>(&self,
path: Path,
effects: &mut <P as PlatformTypes>::Effects)
-> Result<(), Self::Error>
where Path: AsRef<str> + Clone
{
self.inner.notify(path.clone())?;
self.inner.notify(path.clone(), effects)?;

self.request_queue.map_mut(|rq| {
log!(Observe::notify,
effects,
log::Level::Trace,
"{}",
path.as_ref());
log!(Observe::notify,
effects,
log::Level::Trace,
"discarding {} synthetic requests not yet processed",
rq.len());

Self::remove_queued_requests_matching_path(rq, path.as_ref());
self.subs.map_ref(|subs| {
Self::clone_and_enqueue_sub_requests(subs, rq, path.as_ref())
});

log!(Observe::notify,
effects,
log::Level::Trace,
"{} synthetic requests now enqueued",
rq.len());
});

Ok(())
Expand All @@ -471,9 +528,28 @@ impl<P, S, B, RQ, H> Step<P> for Observe<S, B, RQ, H>
msg.as_mut()
.set(opt::WAS_CREATED_BY_OBSERVE, Default::default())
.ok();

log!(Observe::before_message_sent,
effs,
log::Level::Trace,
"=> {:?} {:?}",
sub.addr(),
msg.data().token);
effs.push(Effect::Send(msg.with_addr(sub.addr())));
})
});
} else {
log!(Observe::before_message_sent,
effs,
log::Level::Trace,
"ignoring {:?} {:?}",
msg.addr(),
msg.data().token);
log!(Observe::before_message_sent,
effs,
log::Level::Trace,
"subscriptions: {}",
self.fmt_subs().as_str());
}

Ok(())
Expand Down Expand Up @@ -575,7 +651,7 @@ mod tests {
config: Default::default() }, &mut Default::default()).unwrap().unwrap()
}}),
// We have a new version available
({|step: &Observe<Dummy>| step.notify("foo/bar").unwrap()})
({|step: &Observe<Dummy>| step.notify("foo/bar", &mut vec![]).unwrap()})
]
THEN request_is_duplicated [
// A copy of the original request should be emitted
Expand Down Expand Up @@ -603,6 +679,7 @@ mod tests {
THEN response_is_copied_and_sent_to_subscriber [
(before_message_sent(_, _, test::msg!(CON { 2 . 05 } x.x.x.x:21 with |m: &mut Message<_, _>| {m.token = Token(array_vec!(21)); m.id = Id(1);})) should be ok with {|_| ()}),
(effects should satisfy {|effs| {
let effs = effs.into_iter().filter(|e| matches!(e, Effect::Send(_))).collect::<Vec<_>>();
assert_eq!(effs.len(), 1);
match effs.get(0).unwrap().clone() {
platform::Effect::Send(m) => {
Expand All @@ -624,7 +701,7 @@ mod tests {
recvd_dgram: None,
config: crate::config::Config::default() }, &mut Default::default()).unwrap().unwrap()
}}),
({|step: &Observe<Dummy>| step.notify("foot/bart").unwrap()})
({|step: &Observe<Dummy>| step.notify("foot/bart", &mut vec![]).unwrap()})
]
THEN nothing_happens [
(poll_req(_, _) should satisfy { |req| assert!(req.is_none()) })
Expand All @@ -640,13 +717,13 @@ mod tests {
recvd_dgram: None,
config: crate::config::Config::default() }, &mut Default::default()).unwrap().unwrap()
}}),
({|step: &Observe<Dummy>| step.notify("foo/bar").unwrap()}),
({|step: &Observe<Dummy>| step.notify("foo/bar", &mut vec![]).unwrap()}),
({|step: &Observe<Dummy>| {
step.poll_req(&Snapshot { time: test::ClockMock::new().try_now().unwrap(),
recvd_dgram: None,
config: crate::config::Config::default() }, &mut Default::default()).unwrap().unwrap()
}}),
({|step: &Observe<Dummy>| step.notify("foo/bar").unwrap()})
({|step: &Observe<Dummy>| step.notify("foo/bar", &mut vec![]).unwrap()})
]
THEN request_is_duplicated_multiple_times [
(poll_req(_, _) should satisfy { |req| {
Expand Down
10 changes: 5 additions & 5 deletions toad/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,11 @@ pub mod stepfn {
}

pub trait notify<Self_, E>
where Self: 'static + for<'a> FnMut(&'a Self_, &'a str) -> Result<(), E>
where Self: 'static + for<'a> FnMut(&'a Self_, &'a str, &'a mut Vec<Effect>) -> Result<(), E>
{
}
impl<T, Self_, E> notify<Self_, E> for T
where T: 'static + for<'a> FnMut(&'a Self_, &'a str) -> Result<(), E>
where T: 'static + for<'a> FnMut(&'a Self_, &'a str, &'a mut Vec<Effect>) -> Result<(), E>
{
}

Expand Down Expand Up @@ -245,7 +245,7 @@ impl<State, Rq, Rp, E> Default for MockStep<State, Rq, Rp, E> {
fn default() -> Self {
Self { poll_req: RwLock::new(Box::new(|_, _, _| None)),
poll_resp: RwLock::new(Box::new(|_, _, _, _, _| None)),
notify: RwLock::new(Box::new(|_, _| Ok(()))),
notify: RwLock::new(Box::new(|_, _, _| Ok(()))),
before_message_sent: RwLock::new(Box::new(|_, _, _, _| Ok(()))),
on_message_sent: RwLock::new(Box::new(|_, _, _| Ok(()))),
state: Stem::new(None) }
Expand Down Expand Up @@ -282,11 +282,11 @@ impl<State, Rq, Rp, E> crate::step::Step<Platform> for MockStep<State, Rq, Rp, E
g.as_mut()(self, snap, effects, token, addr)
}

fn notify<Path>(&self, path: Path) -> Result<(), Self::Error>
fn notify<Path>(&self, path: Path, effects: &mut Vec<Effect>) -> Result<(), Self::Error>
where Path: AsRef<str> + Clone
{
let mut g = self.notify.try_write().unwrap();
g.as_mut()(self, path.as_ref())
g.as_mut()(self, path.as_ref(), effects)
}

fn before_message_sent(&self,
Expand Down

0 comments on commit 8c7f023

Please sign in to comment.