From 7cc1861d0a0c6897d231b1bfc9896b2d97f92cf4 Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Fri, 28 Feb 2025 17:34:27 +0100 Subject: [PATCH] test(socket): add reqrep tests --- msg-socket/tests/it/main.rs | 1 + msg-socket/tests/it/reqrep.rs | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+) create mode 100644 msg-socket/tests/it/reqrep.rs diff --git a/msg-socket/tests/it/main.rs b/msg-socket/tests/it/main.rs index 7a3d682..1a0fb94 100644 --- a/msg-socket/tests/it/main.rs +++ b/msg-socket/tests/it/main.rs @@ -1,3 +1,4 @@ mod pubsub; +mod reqrep; fn main() {} diff --git a/msg-socket/tests/it/reqrep.rs b/msg-socket/tests/it/reqrep.rs new file mode 100644 index 0000000..761a989 --- /dev/null +++ b/msg-socket/tests/it/reqrep.rs @@ -0,0 +1,26 @@ +use bytes::Bytes; +use msg_socket::{RepSocket, ReqSocket}; +use msg_transport::tcp::Tcp; +use tokio_stream::StreamExt; + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_reqrep() { + let _ = tracing_subscriber::fmt::try_init(); + + let mut rep = RepSocket::new(Tcp::default()); + let mut req = ReqSocket::new(Tcp::default()); + + rep.bind("0.0.0.0:0").await.unwrap(); + + req.connect(rep.local_addr().unwrap()).await.unwrap(); + + tokio::spawn(async move { + while let Some(request) = rep.next().await { + let msg = request.msg().clone(); + request.respond(msg).unwrap(); + } + }); + + let response = req.request(Bytes::from_static(b"hello")).await.unwrap(); + tracing::info!("Response: {:?}", response); +}