diff --git a/src/bin.rs b/src/bin.rs index 7011d1b..0a9e331 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -31,6 +31,7 @@ use nix::unistd::getpid; use librain::{server, worker, VERSION}; use librain::errors::Result; +use librain::client::client::Client; const DEFAULT_SERVER_PORT: u16 = 7210; const DEFAULT_WORKER_PORT: u16 = 0; @@ -162,6 +163,22 @@ impl WorkerConfig { } } +fn resolve_server_address(address: &str) -> SocketAddr { + match address.to_socket_addrs() { + Err(_) => { + error!("Cannot resolve server address"); + exit(1); + } + Ok(mut addrs) => match addrs.next() { + None => { + error!("Cannot resolve server address"); + exit(1); + } + Some(ref addr) => *addr, + }, + } +} + fn run_worker(_global_args: &ArgMatches, cmd_args: &ArgMatches) { info!("Starting Rain {} worker", VERSION); let ready_file = cmd_args.value_of("READY_FILE"); @@ -173,19 +190,7 @@ fn run_worker(_global_args: &ArgMatches, cmd_args: &ArgMatches) { server_address = format!("{}:{}", server_address, DEFAULT_SERVER_PORT); } - let server_addr = match server_address.to_socket_addrs() { - Err(_) => { - error!("Cannot resolve server address: "); - exit(1); - } - Ok(mut addrs) => match addrs.next() { - None => { - error!("Cannot resolve server address"); - exit(1); - } - Some(ref addr) => *addr, - }, - }; + let server_addr = resolve_server_address(&server_address); let state = { let config = cmd_args.value_of("WORKER_CONFIG").map(|path| { @@ -385,6 +390,30 @@ fn run_starter(_global_args: &ArgMatches, cmd_args: &ArgMatches) { } } +fn stop_server(_global_args: &ArgMatches, cmd_args: &ArgMatches) { + let default_address = format!("localhost:{}", DEFAULT_SERVER_PORT); + let mut address = cmd_args + .value_of("SERVER_ADDRESS") + .unwrap_or(&default_address) + .to_string(); + + if !address.contains(':') { + address = format!("{}:{}", address, DEFAULT_SERVER_PORT); + } + + let scheduler: SocketAddr = resolve_server_address(&address); + let mut client = Client::new(&scheduler).unwrap_or_else(|err| { + error!("Couldn't connect to server at {}: {}", address, err); + exit(1); + }); + client.terminate_server().unwrap_or_else(|err| { + error!("Couldn't stop server: {}", err); + exit(1); + }); + + println!("Server at {} was successfully stopped", address); +} + fn init_log() { // T emporary simple logger for better module log control, default level is INFO // TODO: replace with Fern or log4rs later @@ -543,15 +572,22 @@ fn main() { .long("--logdir") .help("Logging directory for workers & server (default /tmp/rain-logs/run-$HOSTANE-$PID)") .takes_value(true))) + .subcommand( // ---- STOP ---- + SubCommand::with_name("stop") + .about("Stop server and all workers connected to it") + .arg(Arg::with_name("SERVER_ADDRESS") + .help("Address of the server (default = localhost:7210)") + .takes_value(true))) .get_matches(); match args.subcommand() { ("server", Some(cmd_args)) => run_server(&args, cmd_args), ("worker", Some(cmd_args)) => run_worker(&args, cmd_args), ("start", Some(cmd_args)) => run_starter(&args, cmd_args), + ("stop", Some(cmd_args)) => stop_server(&args, cmd_args), _ => { error!("No subcommand provided."); - ::std::process::exit(1); + exit(1); } } } diff --git a/src/server/rpc/client.rs b/src/server/rpc/client.rs index 780b6c5..47179a8 100644 --- a/src/server/rpc/client.rs +++ b/src/server/rpc/client.rs @@ -13,6 +13,7 @@ use errors::{Error, ErrorKind, Result}; use common::{Attributes, DataType}; use common::RcSet; use common::events::{ObjectDescriptor, TaskDescriptor}; +use std::process::exit; pub struct ClientServiceImpl { state: StateRef, @@ -519,4 +520,13 @@ impl client_service::Server for ClientServiceImpl { results.get_state().unwrap().set_ok(()); Promise::ok(()) } + + fn terminate_server( + &mut self, + params: client_service::TerminateServerParams, + results: client_service::TerminateServerResults, + ) -> Promise<(), ::capnp::Error> { + exit(0); + Promise::ok(()) + } }