Skip to content

Commit

Permalink
Create the PicaEvent broadcast sender within the Pica object
Browse files Browse the repository at this point in the history
  • Loading branch information
hchataing committed Jan 16, 2024
1 parent c4af4c6 commit 3d9dc3f
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
8 changes: 4 additions & 4 deletions src/bin/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use pica::{Pica, PicaCommand};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::path::PathBuf;
use tokio::net::TcpListener;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::mpsc;
use tokio::try_join;

const DEFAULT_UCI_PORT: u16 = 7000;
Expand Down Expand Up @@ -67,16 +67,16 @@ async fn main() -> Result<()> {
args.uci_port, args.web_port,
"UCI port and Web port shall be different."
);
let (event_tx, _) = broadcast::channel(16);

let mut pica = Pica::new(event_tx.clone(), args.pcapng_dir);
let mut pica = Pica::new(args.pcapng_dir);
let pica_tx = pica.tx();
let pica_events = pica.events();

#[cfg(feature = "web")]
try_join!(
accept_incoming(pica_tx.clone(), args.uci_port),
pica.run(),
web::serve(pica_tx, event_tx, args.web_port)
web::serve(pica_tx, pica_events, args.web_port)
)?;

#[cfg(not(feature = "web"))]
Expand Down
6 changes: 3 additions & 3 deletions src/bin/server/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fn event_name(event: &PicaEvent) -> &'static str {
async fn handle(
mut req: Request<Body>,
tx: mpsc::Sender<PicaCommand>,
events: broadcast::Sender<PicaEvent>,
events: broadcast::Receiver<PicaEvent>,
) -> Result<Response<Body>, Infallible> {
let static_file = STATIC_FILES
.iter()
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn handle(
.collect::<Vec<_>>()[..]
{
["events"] => {
let stream = BroadcastStream::new(events.subscribe()).map(|result| {
let stream = BroadcastStream::new(events).map(|result| {
result.map(|event| {
format!(
"event: {}\ndata: {}\n\n",
Expand Down Expand Up @@ -256,7 +256,7 @@ pub async fn serve(
let events = events.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
handle(req, tx.clone(), events.clone())
handle(req, tx.clone(), events.subscribe())
}))
}
});
Expand Down
7 changes: 6 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ fn make_measurement(
}

impl Pica {
pub fn new(event_tx: broadcast::Sender<PicaEvent>, pcapng_dir: Option<PathBuf>) -> Self {
pub fn new(pcapng_dir: Option<PathBuf>) -> Self {
let (tx, rx) = mpsc::channel(MAX_SESSION * MAX_DEVICE);
let (event_tx, _) = broadcast::channel(16);
Pica {
devices: HashMap::new(),
anchors: HashMap::new(),
Expand All @@ -256,6 +257,10 @@ impl Pica {
}
}

pub fn events(&self) -> broadcast::Sender<PicaEvent> {
self.event_tx.clone()
}

pub fn tx(&self) -> mpsc::Sender<PicaCommand> {
self.tx.clone()
}
Expand Down

0 comments on commit 3d9dc3f

Please sign in to comment.