Skip to content

Commit

Permalink
icepryx2 sync queue test ok
Browse files Browse the repository at this point in the history
  • Loading branch information
youngday committed Jul 13, 2024
1 parent 5a3881c commit 03d564d
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 19 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,7 @@ name = "zmq_pub"
path = "examples/zeromq_tmq/publish.rs"
[[example]]
name = "ice_pub"
path = "examples/dds_iceoryx2/publisher.rs"
path = "examples/dds_iceoryx2/publisher.rs"
[[example]]
name = "ice_sub"
path = "examples/dds_iceoryx2/subscriber.rs"
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ http

plot your data lightly with rust.

🟢 🔴 🟡
🟢 🔴 🟡 ✏️
5 changes: 4 additions & 1 deletion examples/dds_iceoryx2/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use core::time::Duration;
use iceoryx2::prelude::*;
use realtime_plot::transmission_data::TransmissionData;

const CYCLE_TIME: Duration = Duration::from_secs(1);
const CYCLE_TIME: Duration = Duration::from_millis(100);

fn main() -> Result<(), Box<dyn std::error::Error>> {
let service_name = ServiceName::new("My/Funk/ServiceName")?;
Expand All @@ -29,6 +29,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

while let Iox2Event::Tick = Iox2::wait(CYCLE_TIME) {
counter += 1;
if counter > 100 {
counter = 0;
}
let sample = publisher.loan_uninit()?;

let sample = sample.write_payload(TransmissionData {
Expand Down
33 changes: 30 additions & 3 deletions examples/dds_iceoryx2/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,47 @@
// SPDX-License-Identifier: Apache-2.0 OR MIT
// mod transmission_data;
use core::time::Duration;
use iceoryx2::prelude::*;
use easy_example::transmission_data::TransmissionData;
use iceoryx2::{port::subscriber, prelude::*};
use realtime_plot::transmission_data::TransmissionData;

const CYCLE_TIME: Duration = Duration::from_secs(1);

fn main() -> Result<(), Box<dyn std::error::Error>> {


// let computation = std::thread::spawn(|| {
// // Some expensive computation.
// let _ = test();
// });

// let result = computation.join().unwrap();
// println!("{:?}",result);




let computation = std::thread::spawn(|| {
// Some expensive computation.
let _ = test();
});

let result = computation.join().unwrap();


println!("exit ...");

Ok(())
}


fn test() -> Result<(), Box<dyn std::error::Error>> {
let service_name = ServiceName::new("My/Funk/ServiceName")?;

let service = zero_copy::Service::new(&service_name)
.publish_subscribe()
.open_or_create::<TransmissionData>()?;

let subscriber = service.subscriber().create()?;

while let Iox2Event::Tick = Iox2::wait(CYCLE_TIME) {
while let Some(sample) = subscriber.receive()? {
println!("received: {:?}", *sample);
Expand Down
78 changes: 65 additions & 13 deletions examples/plot.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::future::ok;
use piston_window::{EventLoop, PistonWindow, WindowSettings};
use plotters::prelude::*;
use systemstat::platform::common::Platform;
Expand All @@ -8,7 +9,10 @@ use log::{debug, error, info, trace, warn};
use log4rs;
use std::collections::vec_deque::VecDeque;
use std::time::{Duration, Instant};
use std::sync::mpsc::channel;
use std::thread;
use tokio::sync::mpsc;
use crate::mpsc::Sender;
use tokio::time::sleep;

use realtime_plot::draw_piston_window;
Expand All @@ -17,6 +21,12 @@ use realtime_plot::Settings;
use futures::StreamExt;
use tmq::{subscribe, Context, Result};


use realtime_plot::transmission_data::TransmissionData;
use iceoryx2::{port::subscriber, prelude::*};

const CYCLE_TIME: Duration = Duration::from_millis(10);

const FPS: u32 = 15;
const LENGTH: u32 = 20;
const N_DATA_POINTS: usize = (FPS * LENGTH) as usize;
Expand All @@ -34,29 +44,69 @@ async fn main() {
let version: String = "0.1.0703".to_string();
info!("Start your app,version:{0}", version);

let (tx, mut rx) = mpsc::channel(100);
// let (tx, mut rx) = mpsc::channel(100);

let (sender, receiver) = channel();


let mut loop_cnt: i64 = 0;

let mut socket: subscribe::Subscribe = subscribe(&Context::new())
.connect("tcp://127.0.0.1:7899")
.unwrap()
.subscribe(b"topic")
.unwrap();
//zeromq received
// tokio::spawn(async move {
// loop {
// info!("loop_cnt {:?}", loop_cnt);
// loop_cnt += loop_cnt;
// let now = Instant::now(); // 程序起始时间
// info!("zmq_sub start: {:?}", now);
// let val = zmq_sub(&mut socket).await.unwrap();
// let end = now.elapsed().as_millis();
// info!("zmq_sub end,dur: {:?} ms.", end);
// let ret_send = tx.send(val).await;
// info!("ret_send: {:?}", ret_send);
// info!("🟢 send val: {:?}", val);
// }
// });
//iceoryx2 received

tokio::spawn(async move {
loop {
// info!("loop_cnt {:?}", loop_cnt);
loop_cnt += loop_cnt;
let now = Instant::now(); // 程序起始时间
info!("zmq_sub start: {:?}", now);
let val = zmq_sub(&mut socket).await.unwrap();
let end = now.elapsed().as_millis();
info!("zmq_sub end,dur: {:?} ms.", end);
let ret_send = tx.send(val).await;
info!("ret_send: {:?}", ret_send);
info!("🟢 send val: {:?}", val);
// let computation = std::thread::spawn(move || {
// Some expensive computation.
// let _ = test(tx);

let service_name = ServiceName::new("My/Funk/ServiceName").unwrap();

let service = zero_copy::Service::new(&service_name)
.publish_subscribe()
.open_or_create::<TransmissionData>().unwrap();

let subscriber = service.subscriber().create().unwrap();
while let Iox2Event::Tick = Iox2::wait(CYCLE_TIME) {
while let Some(sample) = subscriber.receive().unwrap() {
info!("received: {:?}", *sample);

let val =sample.x.as_f64()*0.01;

let _ret_send = sender.send(val);
info!("🟢 send val: {:?}", val);

}
}

info!("exit ...");

// Ok(())

});

// let result = computation.join().unwrap();//TODO: block and nonblock




let sys = System::new();
window.set_max_fps(FPS as u64);
Expand All @@ -69,7 +119,8 @@ async fn main() {

load_measurement[epoch % FPS as usize] = sys.cpu_load()?; //

let rx_data = rx.try_recv();
// let rx_data = rx.try_recv();//zeromq
let rx_data=receiver.recv();//ice_oryx
info!("🟡 receive {:?}", rx_data);

if rx_data.is_ok() {
Expand Down Expand Up @@ -172,3 +223,4 @@ async fn zmq_sub(socket: &mut subscribe::Subscribe) -> Result<f64> {
}
Ok(value)
}

0 comments on commit 03d564d

Please sign in to comment.