Skip to content

Commit

Permalink
Merge pull request #283 from Tim-Zhang/update-examples
Browse files Browse the repository at this point in the history
Update examples
  • Loading branch information
lifupan authored Feb 6, 2025
2 parents e5264e0 + 725f0f1 commit d713dd4
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 160 deletions.
55 changes: 36 additions & 19 deletions example/async-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,25 @@ async fn main() {
"health.check()",
now.elapsed(),
);

let resp = thc
.check(
context::with_duration(core::time::Duration::from_millis(20)),
&req,
)
.await;

assert_eq!(
resp,
Err(ttrpc::Error::Others(
"Receive packet timeout Elapsed(())".into()
))
);

println!(
"Green Thread 1 - {} -> {:?} ended: {:?}",
"health.check()",
thc.check(context::with_duration(core::time::Duration::from_millis(20)), &req)
.await,
resp,
now.elapsed(),
);
});
Expand All @@ -51,18 +65,16 @@ async fn main() {
now.elapsed(),
);

let show = match tac
let resp = tac
.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new())
.await
{
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
};
.await;
let expected_resp = utils::resp::asynchronous::agent_list_interfaces();
assert_eq!(resp, expected_resp);

println!(
"Green Thread 2 - {} -> {} ended: {:?}",
"Green Thread 2 - {} -> {:?} ended: {:?}",
"agent.list_interfaces()",
show,
resp,
now.elapsed(),
);
});
Expand All @@ -74,17 +86,16 @@ async fn main() {
now.elapsed()
);

let show = match ac
let resp = ac
.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new())
.await
{
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
};
.await;
let expected_resp = utils::resp::online_cpu_mem_not_impl();
assert_eq!(resp, Err(expected_resp));

println!(
"Green Thread 3 - {} -> {} ended: {:?}",
"Green Thread 3 - {} -> {:?} ended: {:?}",
"agent.online_cpu_mem()",
show,
resp,
now.elapsed()
);

Expand All @@ -102,7 +113,13 @@ async fn main() {
);
});

let _ = tokio::join!(t1, t2, t3);
let (r1, r2, r3) = tokio::join!(t1, t2, t3);
assert!(
r1.is_ok() && r2.is_ok() && r3.is_ok(),
"async test is failed because some error occurred"
);

println!("***** Asycn test is OK! *****");
}

fn default_ctx() -> Context {
Expand Down
49 changes: 10 additions & 39 deletions example/async-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@
mod protocols;
mod utils;

#[macro_use]
extern crate log;

use std::sync::Arc;

use log::LevelFilter;

#[cfg(unix)]
use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc, types};
use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc};
#[cfg(unix)]
use ttrpc::asynchronous::Server;
use ttrpc::error::{Error, Result};
use ttrpc::proto::{Code, Status};

#[cfg(unix)]
use async_trait::async_trait;
Expand All @@ -35,30 +30,18 @@ impl health_ttrpc::Health for HealthService {
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
_req: health::CheckRequest,
) -> Result<health::HealthCheckResponse> {
let mut status = Status::new();

status.set_code(Code::NOT_FOUND);
status.set_message("Just for fun".to_string());

sleep(std::time::Duration::from_secs(10)).await;

Err(Error::RpcStatus(status))
) -> ttrpc::Result<health::HealthCheckResponse> {
// Mock timeout
sleep(std::time::Duration::from_secs(1)).await;
unreachable!();
}

async fn version(
&self,
ctx: &::ttrpc::r#async::TtrpcContext,
req: health::CheckRequest,
) -> Result<health::VersionCheckResponse> {
info!("version {:?}", req);
info!("ctx {:?}", ctx);
let mut rep = health::VersionCheckResponse::new();
rep.agent_version = "mock.0.1".to_string();
rep.grpc_version = "0.0.1".to_string();
let mut status = Status::new();
status.set_code(Code::NOT_FOUND);
Ok(rep)
_ctx: &::ttrpc::r#async::TtrpcContext,
_req: health::CheckRequest,
) -> ttrpc::Result<health::VersionCheckResponse> {
utils::resp::asynchronous::health_version()
}
}

Expand All @@ -71,19 +54,7 @@ impl agent_ttrpc::AgentService for AgentService {
_ctx: &::ttrpc::r#async::TtrpcContext,
_req: agent::ListInterfacesRequest,
) -> ::ttrpc::Result<agent::Interfaces> {
let mut rp = Vec::new();

let mut i = types::Interface::new();
i.name = "first".to_string();
rp.push(i);
let mut i = types::Interface::new();
i.name = "second".to_string();
rp.push(i);

let mut i = agent::Interfaces::new();
i.Interfaces = rp;

Ok(i)
utils::resp::asynchronous::agent_list_interfaces()
}
}

Expand Down
16 changes: 15 additions & 1 deletion example/async-stream-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,21 @@ async fn main() {

let t8 = tokio::spawn(server_send_stream(sc));

let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7, t8);
let (r1, r2, r3, r4, r5, r6, r7, r8) = tokio::join!(t1, t2, t3, t4, t5, t6, t7, t8);

assert!(
r1.is_ok()
&& r2.is_ok()
&& r3.is_ok()
&& r4.is_ok()
&& r5.is_ok()
&& r6.is_ok()
&& r7.is_ok()
&& r8.is_ok(),
"async-stream test is failed because some error occurred"
);

println!("***** Async Stream test is OK! *****");
}

fn default_ctx() -> Context {
Expand Down
2 changes: 1 addition & 1 deletion example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std
let new_contents = contents.replace(from, to);

let mut dst = File::create(file_name)?;
dst.write(new_contents.as_bytes())?;
dst.write_all(new_contents.as_bytes())?;

Ok(())
}
83 changes: 30 additions & 53 deletions example/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc};
use std::thread;
use std::time::Duration;
use ttrpc::context::{self, Context};
use ttrpc::error::Error;
use ttrpc::proto::Code;
use ttrpc::Client;

#[cfg(not(target_os = "linux"))]
Expand All @@ -45,6 +43,8 @@ fn main() {
thread::sleep(Duration::from_secs(1));
let current_fd_count = get_fd_count();
assert_eq!(current_fd_count, expected_fd_count, "check fd count");

println!("***** Sync test is OK! *****");
}

fn connect_once() {
Expand All @@ -67,26 +67,22 @@ fn connect_once() {
now.elapsed(),
);

let rsp = thc.check(default_ctx(), &req);
match rsp.as_ref() {
Err(Error::RpcStatus(s)) => {
assert_eq!(Code::NOT_FOUND, s.code());
assert_eq!("Just for fun".to_string(), s.message())
}
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(x) => {
panic!(
"not expecting a OK response from the example server: {:?}",
x
)
}
}
let resp = thc.check(
context::with_duration(core::time::Duration::from_millis(20)),
&req,
);

assert_eq!(
resp,
Err(ttrpc::Error::Others(
"Receive packet from Receiver timeout: timed out waiting on channel".into()
))
);

println!(
"OS Thread {:?} - health.check() -> {:?} ended: {:?}",
std::thread::current().id(),
rsp,
resp,
now.elapsed(),
);
});
Expand All @@ -98,21 +94,14 @@ fn connect_once() {
now.elapsed(),
);

let show = match tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new()) {
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(s) => {
assert_eq!("first".to_string(), s.Interfaces[0].name);
assert_eq!("second".to_string(), s.Interfaces[1].name);
format!("{s:?}")
}
};
let resp = tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new());
let expected_resp = utils::resp::sync::agent_list_interfaces();
assert_eq!(resp, expected_resp);

println!(
"OS Thread {:?} - agent.list_interfaces() -> {} ended: {:?}",
std::thread::current().id(),
show,
"{resp:?}",
now.elapsed(),
);
});
Expand All @@ -121,34 +110,22 @@ fn connect_once() {
"Main OS Thread - agent.online_cpu_mem() started: {:?}",
now.elapsed()
);
let show = match ac.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new()) {
Err(Error::RpcStatus(s)) => {
assert_eq!(Code::NOT_FOUND, s.code());
assert_eq!(
"/grpc.AgentService/OnlineCPUMem is not supported".to_string(),
s.message()
);
format!("{s:?}")
}
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(s) => {
panic!(
"not expecting a OK response from the example server: {:?}",
s
)
}
};
let resp = ac
.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new())
.expect_err("not the expecting error from the example server");
let expected_resp = utils::resp::online_cpu_mem_not_impl();
assert_eq!(resp, expected_resp);

println!(
"Main OS Thread - agent.online_cpu_mem() -> {} ended: {:?}",
show,
"Main OS Thread - agent.online_cpu_mem() -> {:?} ended: {:?}",
resp,
now.elapsed()
);

let version = hc.version(default_ctx(), &health::CheckRequest::new());
assert_eq!("mock.0.1", version.as_ref().unwrap().agent_version.as_str());
assert_eq!("0.0.1", version.as_ref().unwrap().grpc_version.as_str());
let expected_version_resp = utils::resp::sync::health_version();
assert_eq!(version, expected_version_resp);

println!(
"Main OS Thread - health.version() started: {:?}",
now.elapsed()
Expand Down
Loading

0 comments on commit d713dd4

Please sign in to comment.