Skip to content

Commit

Permalink
read me
Browse files Browse the repository at this point in the history
  • Loading branch information
youngday committed Jul 13, 2024
1 parent 03d564d commit 79b2e85
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 59 deletions.
26 changes: 23 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,28 @@ just cpu0 (red line ) get data from zeromq.
![realtime_plot.png](realtime_plot.png)
## compile and run

⚠️ please check your communication type: COM_TYPE:u32=0;//0=zeromq 1=ice_oryx2,2=?

### zeromq
in 2 terminals
* pub data
* pub data
```sh
cargo run --example zmq_pub
```
* sub data and plot in realtime
```sh
cargo run --example plot
```

### ice_oryx2
in 2 terminals
* pub data
```sh
cargo run --example ice_pub
```
* sub data and plot in realtime
```sh
cargo run --example plot
```
## reference

### plotters-piston - The Piston Window backend for Plotters
Expand All @@ -33,10 +45,18 @@ i
### tmq
zeromq lib for rust .
## todo

### config

put blew in config.toml

COM_TYPE:u32=0;//0=zeromq 1=ice_oryx2,2=?

### speed
15 fps
### queue
zeromq
☑️ zeromq
☑️ ice_oryx2
mqtt
socket
tcp
Expand Down
107 changes: 51 additions & 56 deletions examples/plot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use plotters::prelude::*;
use systemstat::platform::common::Platform;
use systemstat::System;

use crate::mpsc::Sender;
use chrono::prelude::*;
use log::{debug, error, info, trace, warn};
use log4rs;
use std::collections::vec_deque::VecDeque;
use std::time::{Duration, Instant};
use std::sync::mpsc::channel;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use crate::mpsc::Sender;
use tokio::time::sleep;

use realtime_plot::draw_piston_window;
Expand All @@ -21,16 +21,15 @@ use realtime_plot::Settings;
use futures::StreamExt;
use tmq::{subscribe, Context, Result};


use realtime_plot::transmission_data::TransmissionData;
use iceoryx2::{port::subscriber, prelude::*};
use realtime_plot::transmission_data::TransmissionData;

const CYCLE_TIME: Duration = Duration::from_millis(10);

const FPS: u32 = 15;
const LENGTH: u32 = 20;
const N_DATA_POINTS: usize = (FPS * LENGTH) as usize;

const COM_TYPE:u32=1;//0=zeromq 1=ice_oryx2,2=?
#[tokio::main]
async fn main() {
let mut window: PistonWindow = WindowSettings::new("Real Time CPU Usage", [450, 300])
Expand All @@ -48,65 +47,62 @@ async fn main() {

let (sender, receiver) = channel();


let mut loop_cnt: i64 = 0;

let mut socket: subscribe::Subscribe = subscribe(&Context::new())
.connect("tcp://127.0.0.1:7899")
.unwrap()
.subscribe(b"topic")
.unwrap();

//zeromq received
// tokio::spawn(async move {
// loop {
// info!("loop_cnt {:?}", loop_cnt);
// loop_cnt += loop_cnt;
// let now = Instant::now(); // 程序起始时间
// info!("zmq_sub start: {:?}", now);
// let val = zmq_sub(&mut socket).await.unwrap();
// let end = now.elapsed().as_millis();
// info!("zmq_sub end,dur: {:?} ms.", end);
// let ret_send = tx.send(val).await;
// info!("ret_send: {:?}", ret_send);
// info!("🟢 send val: {:?}", val);
// }
// });
//iceoryx2 received

tokio::spawn(async move {
// let computation = std::thread::spawn(move || {
// Some expensive computation.
// let _ = test(tx);

let service_name = ServiceName::new("My/Funk/ServiceName").unwrap();

let service = zero_copy::Service::new(&service_name)
.publish_subscribe()
.open_or_create::<TransmissionData>().unwrap();

let subscriber = service.subscriber().create().unwrap();
while let Iox2Event::Tick = Iox2::wait(CYCLE_TIME) {
while let Some(sample) = subscriber.receive().unwrap() {
info!("received: {:?}", *sample);

let val =sample.x.as_f64()*0.01;

let _ret_send = sender.send(val);

if COM_TYPE==0 {
tokio::spawn(async move {
loop {
info!("loop_cnt {:?}", loop_cnt);
loop_cnt += loop_cnt;
let now = Instant::now(); // 程序起始时间
info!("zmq_sub start: {:?}", now);
let val = zmq_sub(&mut socket).await.unwrap();
let end = now.elapsed().as_millis();
info!("zmq_sub end,dur: {:?} ms.", end);
let ret_send = sender.send(val);
info!("ret_send: {:?}", ret_send);
info!("🟢 send val: {:?}", val);

}
}

info!("exit ...");

// Ok(())

});

// let result = computation.join().unwrap();//TODO: block and nonblock



});
} else if COM_TYPE==1 {
//iceoryx2 received
tokio::spawn(async move {
// let computation = std::thread::spawn(move || {
// Some expensive computation.
// let _ = test(tx);

let service_name = ServiceName::new("My/Funk/ServiceName").unwrap();

let service = zero_copy::Service::new(&service_name)
.publish_subscribe()
.open_or_create::<TransmissionData>()
.unwrap();

let subscriber = service.subscriber().create().unwrap();
while let Iox2Event::Tick = Iox2::wait(CYCLE_TIME) {
while let Some(sample) = subscriber.receive().unwrap() {
info!("received: {:?}", *sample);

let val = sample.x.as_f64() * 0.01;

let _ret_send = sender.send(val);
info!("🟢 send val: {:?}", val);
}
}
info!("exit ...");
});
// let result = computation.join().unwrap();//TODO: block and nonblock
}else {

}

let sys = System::new();
window.set_max_fps(FPS as u64);
Expand All @@ -120,7 +116,7 @@ async fn main() {
load_measurement[epoch % FPS as usize] = sys.cpu_load()?; //

// let rx_data = rx.try_recv();//zeromq
let rx_data=receiver.recv();//ice_oryx
let rx_data = receiver.recv(); //ice_oryx
info!("🟡 receive {:?}", rx_data);

if rx_data.is_ok() {
Expand Down Expand Up @@ -223,4 +219,3 @@ async fn zmq_sub(socket: &mut subscribe::Subscribe) -> Result<f64> {
}
Ok(value)
}

0 comments on commit 79b2e85

Please sign in to comment.