Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update examples #283

Merged
merged 3 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions example/async-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,25 @@ async fn main() {
"health.check()",
now.elapsed(),
);

let resp = thc
.check(
context::with_duration(core::time::Duration::from_millis(20)),
&req,
)
.await;

assert_eq!(
resp,
Err(ttrpc::Error::Others(
"Receive packet timeout Elapsed(())".into()
))
);

println!(
"Green Thread 1 - {} -> {:?} ended: {:?}",
"health.check()",
thc.check(context::with_duration(core::time::Duration::from_millis(20)), &req)
.await,
resp,
now.elapsed(),
);
});
Expand All @@ -51,18 +65,16 @@ async fn main() {
now.elapsed(),
);

let show = match tac
let resp = tac
.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new())
.await
{
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
};
.await;
let expected_resp = utils::resp::asynchronous::agent_list_interfaces();
assert_eq!(resp, expected_resp);

println!(
"Green Thread 2 - {} -> {} ended: {:?}",
"Green Thread 2 - {} -> {:?} ended: {:?}",
"agent.list_interfaces()",
show,
resp,
now.elapsed(),
);
});
Expand All @@ -74,17 +86,16 @@ async fn main() {
now.elapsed()
);

let show = match ac
let resp = ac
.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new())
.await
{
Err(e) => format!("{:?}", e),
Ok(s) => format!("{:?}", s),
};
.await;
let expected_resp = utils::resp::online_cpu_mem_not_impl();
assert_eq!(resp, Err(expected_resp));

println!(
"Green Thread 3 - {} -> {} ended: {:?}",
"Green Thread 3 - {} -> {:?} ended: {:?}",
"agent.online_cpu_mem()",
show,
resp,
now.elapsed()
);

Expand All @@ -102,7 +113,13 @@ async fn main() {
);
});

let _ = tokio::join!(t1, t2, t3);
let (r1, r2, r3) = tokio::join!(t1, t2, t3);
assert!(
r1.is_ok() && r2.is_ok() && r3.is_ok(),
"async test is failed because some error occurred"
);

println!("***** Asycn test is OK! *****");
}

fn default_ctx() -> Context {
Expand Down
49 changes: 10 additions & 39 deletions example/async-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,14 @@
mod protocols;
mod utils;

#[macro_use]
extern crate log;

use std::sync::Arc;

use log::LevelFilter;

#[cfg(unix)]
use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc, types};
use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc};
#[cfg(unix)]
use ttrpc::asynchronous::Server;
use ttrpc::error::{Error, Result};
use ttrpc::proto::{Code, Status};

#[cfg(unix)]
use async_trait::async_trait;
Expand All @@ -35,30 +30,18 @@ impl health_ttrpc::Health for HealthService {
&self,
_ctx: &::ttrpc::r#async::TtrpcContext,
_req: health::CheckRequest,
) -> Result<health::HealthCheckResponse> {
let mut status = Status::new();

status.set_code(Code::NOT_FOUND);
status.set_message("Just for fun".to_string());

sleep(std::time::Duration::from_secs(10)).await;

Err(Error::RpcStatus(status))
) -> ttrpc::Result<health::HealthCheckResponse> {
// Mock timeout
sleep(std::time::Duration::from_secs(1)).await;
unreachable!();
}

async fn version(
&self,
ctx: &::ttrpc::r#async::TtrpcContext,
req: health::CheckRequest,
) -> Result<health::VersionCheckResponse> {
info!("version {:?}", req);
info!("ctx {:?}", ctx);
let mut rep = health::VersionCheckResponse::new();
rep.agent_version = "mock.0.1".to_string();
rep.grpc_version = "0.0.1".to_string();
let mut status = Status::new();
status.set_code(Code::NOT_FOUND);
Ok(rep)
_ctx: &::ttrpc::r#async::TtrpcContext,
_req: health::CheckRequest,
) -> ttrpc::Result<health::VersionCheckResponse> {
utils::resp::asynchronous::health_version()
}
}

Expand All @@ -71,19 +54,7 @@ impl agent_ttrpc::AgentService for AgentService {
_ctx: &::ttrpc::r#async::TtrpcContext,
_req: agent::ListInterfacesRequest,
) -> ::ttrpc::Result<agent::Interfaces> {
let mut rp = Vec::new();

let mut i = types::Interface::new();
i.name = "first".to_string();
rp.push(i);
let mut i = types::Interface::new();
i.name = "second".to_string();
rp.push(i);

let mut i = agent::Interfaces::new();
i.Interfaces = rp;

Ok(i)
utils::resp::asynchronous::agent_list_interfaces()
}
}

Expand Down
16 changes: 15 additions & 1 deletion example/async-stream-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,21 @@ async fn main() {

let t8 = tokio::spawn(server_send_stream(sc));

let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7, t8);
let (r1, r2, r3, r4, r5, r6, r7, r8) = tokio::join!(t1, t2, t3, t4, t5, t6, t7, t8);

assert!(
r1.is_ok()
&& r2.is_ok()
&& r3.is_ok()
&& r4.is_ok()
&& r5.is_ok()
&& r6.is_ok()
&& r7.is_ok()
&& r8.is_ok(),
"async-stream test is failed because some error occurred"
);

println!("***** Async Stream test is OK! *****");
}

fn default_ctx() -> Context {
Expand Down
2 changes: 1 addition & 1 deletion example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std
let new_contents = contents.replace(from, to);

let mut dst = File::create(file_name)?;
dst.write(new_contents.as_bytes())?;
dst.write_all(new_contents.as_bytes())?;

Ok(())
}
83 changes: 30 additions & 53 deletions example/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use protocols::sync::{agent, agent_ttrpc, health, health_ttrpc};
use std::thread;
use std::time::Duration;
use ttrpc::context::{self, Context};
use ttrpc::error::Error;
use ttrpc::proto::Code;
use ttrpc::Client;

#[cfg(not(target_os = "linux"))]
Expand All @@ -45,6 +43,8 @@ fn main() {
thread::sleep(Duration::from_secs(1));
let current_fd_count = get_fd_count();
assert_eq!(current_fd_count, expected_fd_count, "check fd count");

println!("***** Sync test is OK! *****");
}

fn connect_once() {
Expand All @@ -67,26 +67,22 @@ fn connect_once() {
now.elapsed(),
);

let rsp = thc.check(default_ctx(), &req);
match rsp.as_ref() {
Err(Error::RpcStatus(s)) => {
assert_eq!(Code::NOT_FOUND, s.code());
assert_eq!("Just for fun".to_string(), s.message())
}
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(x) => {
panic!(
"not expecting a OK response from the example server: {:?}",
x
)
}
}
let resp = thc.check(
context::with_duration(core::time::Duration::from_millis(20)),
&req,
);

assert_eq!(
resp,
Err(ttrpc::Error::Others(
"Receive packet from Receiver timeout: timed out waiting on channel".into()
))
);

println!(
"OS Thread {:?} - health.check() -> {:?} ended: {:?}",
std::thread::current().id(),
rsp,
resp,
now.elapsed(),
);
});
Expand All @@ -98,21 +94,14 @@ fn connect_once() {
now.elapsed(),
);

let show = match tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new()) {
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(s) => {
assert_eq!("first".to_string(), s.Interfaces[0].name);
assert_eq!("second".to_string(), s.Interfaces[1].name);
format!("{s:?}")
}
};
let resp = tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new());
let expected_resp = utils::resp::sync::agent_list_interfaces();
assert_eq!(resp, expected_resp);

println!(
"OS Thread {:?} - agent.list_interfaces() -> {} ended: {:?}",
std::thread::current().id(),
show,
"{resp:?}",
now.elapsed(),
);
});
Expand All @@ -121,34 +110,22 @@ fn connect_once() {
"Main OS Thread - agent.online_cpu_mem() started: {:?}",
now.elapsed()
);
let show = match ac.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new()) {
Err(Error::RpcStatus(s)) => {
assert_eq!(Code::NOT_FOUND, s.code());
assert_eq!(
"/grpc.AgentService/OnlineCPUMem is not supported".to_string(),
s.message()
);
format!("{s:?}")
}
Err(e) => {
panic!("not expecting an error from the example server: {:?}", e)
}
Ok(s) => {
panic!(
"not expecting a OK response from the example server: {:?}",
s
)
}
};
let resp = ac
.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new())
.expect_err("not the expecting error from the example server");
let expected_resp = utils::resp::online_cpu_mem_not_impl();
assert_eq!(resp, expected_resp);

println!(
"Main OS Thread - agent.online_cpu_mem() -> {} ended: {:?}",
show,
"Main OS Thread - agent.online_cpu_mem() -> {:?} ended: {:?}",
resp,
now.elapsed()
);

let version = hc.version(default_ctx(), &health::CheckRequest::new());
assert_eq!("mock.0.1", version.as_ref().unwrap().agent_version.as_str());
assert_eq!("0.0.1", version.as_ref().unwrap().grpc_version.as_str());
let expected_version_resp = utils::resp::sync::health_version();
assert_eq!(version, expected_version_resp);

println!(
"Main OS Thread - health.version() started: {:?}",
now.elapsed()
Expand Down
Loading