diff --git a/src/bin/server/mod.rs b/src/bin/server/mod.rs index bfa0bdb..616ef5b 100644 --- a/src/bin/server/mod.rs +++ b/src/bin/server/mod.rs @@ -26,7 +26,7 @@ use pica::{Pica, PicaCommand}; use std::net::{Ipv4Addr, SocketAddrV4}; use std::path::PathBuf; use tokio::net::TcpListener; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::mpsc; use tokio::try_join; const DEFAULT_UCI_PORT: u16 = 7000; @@ -67,16 +67,16 @@ async fn main() -> Result<()> { args.uci_port, args.web_port, "UCI port and Web port shall be different." ); - let (event_tx, _) = broadcast::channel(16); - let mut pica = Pica::new(event_tx.clone(), args.pcapng_dir); + let mut pica = Pica::new(args.pcapng_dir); let pica_tx = pica.tx(); + let pica_events = pica.events(); #[cfg(feature = "web")] try_join!( accept_incoming(pica_tx.clone(), args.uci_port), pica.run(), - web::serve(pica_tx, event_tx, args.web_port) + web::serve(pica_tx, pica_events, args.web_port) )?; #[cfg(not(feature = "web"))] diff --git a/src/bin/server/web.rs b/src/bin/server/web.rs index 0af876f..1481e54 100644 --- a/src/bin/server/web.rs +++ b/src/bin/server/web.rs @@ -120,7 +120,7 @@ fn event_name(event: &PicaEvent) -> &'static str { async fn handle( mut req: Request, tx: mpsc::Sender, - events: broadcast::Sender, + events: broadcast::Receiver, ) -> Result, Infallible> { let static_file = STATIC_FILES .iter() @@ -168,7 +168,7 @@ async fn handle( .collect::>()[..] { ["events"] => { - let stream = BroadcastStream::new(events.subscribe()).map(|result| { + let stream = BroadcastStream::new(events).map(|result| { result.map(|event| { format!( "event: {}\ndata: {}\n\n", @@ -256,7 +256,7 @@ pub async fn serve( let events = events.clone(); async move { Ok::<_, Infallible>(service_fn(move |req| { - handle(req, tx.clone(), events.clone()) + handle(req, tx.clone(), events.subscribe()) })) } }); diff --git a/src/lib.rs b/src/lib.rs index efe9e17..6b5600d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -243,8 +243,9 @@ fn make_measurement( } impl Pica { - pub fn new(event_tx: broadcast::Sender, pcapng_dir: Option) -> Self { + pub fn new(pcapng_dir: Option) -> Self { let (tx, rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE); + let (event_tx, _) = broadcast::channel(16); Pica { devices: HashMap::new(), anchors: HashMap::new(), @@ -256,6 +257,10 @@ impl Pica { } } + pub fn events(&self) -> broadcast::Sender { + self.event_tx.clone() + } + pub fn tx(&self) -> mpsc::Sender { self.tx.clone() }