diff --git a/src/server/controlchan/command.rs b/src/server/controlchan/command.rs index afd9458e..62f0ee28 100644 --- a/src/server/controlchan/command.rs +++ b/src/server/controlchan/command.rs @@ -39,6 +39,7 @@ pub enum Command { Help, Noop, Pasv, + Epsv, Port { /// The address to use to make an active connection to the client addr: String, diff --git a/src/server/controlchan/commands/epsv.rs b/src/server/controlchan/commands/epsv.rs new file mode 100644 index 00000000..bf78ff8b --- /dev/null +++ b/src/server/controlchan/commands/epsv.rs @@ -0,0 +1,159 @@ +//! The RFC 2428 Passive (`EPSV`) command +// +// The EPSV command requests that a server listen on a data port and +// wait for a connection. The EPSV command takes an optional argument. +// The response to this command includes only the TCP port number of the +// listening connection. + +use crate::{ + auth::UserDetail, + server::{ + chancomms::{DataChanCmd, ProxyLoopMsg, ProxyLoopSender}, + controlchan::{ + error::ControlChanError, + handler::{CommandContext, CommandHandler}, + Reply, ReplyCode, + }, + datachan, + session::SharedSession, + ControlChanMsg, + }, + storage::{Metadata, StorageBackend}, +}; +use async_trait::async_trait; +use std::{io, net::SocketAddr, ops::Range}; +use tokio::net::TcpListener; +use tokio::sync::mpsc::{channel, Receiver, Sender}; + +const BIND_RETRIES: u8 = 10; + +#[derive(Debug)] +pub struct Epsv {} + +impl Epsv { + pub fn new() -> Self { + Epsv {} + } + + #[tracing_attributes::instrument] + async fn try_port_range(local_addr: SocketAddr, passive_ports: Range) -> io::Result { + let rng_length = passive_ports.end - passive_ports.start + 1; + + let mut listener: io::Result = Err(io::Error::new(io::ErrorKind::InvalidInput, "Bind retries cannot be 0")); + + for _ in 1..BIND_RETRIES { + let random_u32 = { + let mut data = [0; 4]; + getrandom::getrandom(&mut data).expect("Error generating random port"); + u32::from_ne_bytes(data) + }; + + let port = random_u32 % rng_length as u32 + passive_ports.start as u32; + listener = TcpListener::bind(std::net::SocketAddr::new(local_addr.ip(), port as u16)).await; + if listener.is_ok() { + break; + } + } + + listener + } + + // modifies the session by adding channels that are used to communicate with the data connection + // processing loop. + #[tracing_attributes::instrument] + async fn setup_inter_loop_comms(&self, session: SharedSession, control_loop_tx: Sender) + where + U: UserDetail + 'static, + S: StorageBackend + 'static, + S::Metadata: Metadata, + { + let (cmd_tx, cmd_rx): (Sender, Receiver) = channel(1); + let (data_abort_tx, data_abort_rx): (Sender<()>, Receiver<()>) = channel(1); + + let mut session = session.lock().await; + session.data_cmd_tx = Some(cmd_tx); + session.data_cmd_rx = Some(cmd_rx); + session.data_abort_tx = Some(data_abort_tx); + session.data_abort_rx = Some(data_abort_rx); + session.control_msg_tx = Some(control_loop_tx); + } + + // For non-proxy mode we choose a data port here and start listening on it while letting the control + // channel know (via method return) what the address is that the client should connect to. + #[tracing_attributes::instrument] + async fn handle_nonproxy_mode(&self, args: CommandContext) -> Result + where + U: UserDetail + 'static, + S: StorageBackend + 'static, + S::Metadata: Metadata, + { + let CommandContext { + logger, + tx_control_chan: tx, + session, + .. + } = args; + + let listener = Epsv::try_port_range(args.local_addr, args.passive_ports).await; + + let listener = match listener { + Err(_) => return Ok(Reply::new(ReplyCode::CantOpenDataConnection, "No data connection established")), + Ok(l) => l, + }; + + let port = listener.local_addr()?.port(); + + let reply = make_epsv_reply(port); + if let Reply::CodeAndMsg { + code: ReplyCode::EnteringExtendedPassiveMode, + .. + } = reply + { + self.setup_inter_loop_comms(session.clone(), tx).await; + // Open the data connection in a new task and process it. + // We cannot await this since we first need to let the client know where to connect :-) + tokio::spawn(async move { + if let Ok((socket, _socket_addr)) = listener.accept().await { + datachan::spawn_processing(logger, session, socket).await; + } + }); + } + + Ok(reply) + } + + // For proxy mode we prepare the session and let the proxy loop know (via channel) that it + // should choose a data port and check for connections on it. + #[tracing_attributes::instrument] + async fn handle_proxy_mode(&self, args: CommandContext, tx: ProxyLoopSender) -> Result + where + U: UserDetail + 'static, + S: StorageBackend + 'static, + S::Metadata: Metadata, + { + self.setup_inter_loop_comms(args.session.clone(), args.tx_control_chan).await; + tx.send(ProxyLoopMsg::AssignDataPortCommand(args.session.clone())).await.unwrap(); + Ok(Reply::None) + } +} + +#[async_trait] +impl CommandHandler for Epsv +where + User: UserDetail + 'static, + Storage: StorageBackend + 'static, + Storage::Metadata: Metadata, +{ + #[tracing_attributes::instrument] + async fn handle(&self, args: CommandContext) -> Result { + let sender: Option> = args.tx_proxyloop.clone(); + match sender { + Some(tx) => self.handle_proxy_mode(args, tx).await, + None => self.handle_nonproxy_mode(args).await, + } + } +} + +pub fn make_epsv_reply(port: u16) -> Reply { + Reply::new_with_string(ReplyCode::EnteringExtendedPassiveMode, format!("Entering Extended Passive Mode (|||{}|)", port)) +} diff --git a/src/server/controlchan/commands/feat.rs b/src/server/controlchan/commands/feat.rs index 7a1f5202..1b62cbbf 100644 --- a/src/server/controlchan/commands/feat.rs +++ b/src/server/controlchan/commands/feat.rs @@ -26,7 +26,8 @@ where { #[tracing_attributes::instrument] async fn handle(&self, args: CommandContext) -> Result { - let mut feat_text = vec![" SIZE", " MDTM", " UTF8"]; + let mut feat_text = vec![" SIZE", " MDTM", " UTF8", " EPSV"]; + // Add the features. According to the spec each feature line must be // indented by a space. if args.tls_configured { diff --git a/src/server/controlchan/commands/mod.rs b/src/server/controlchan/commands/mod.rs index aed4b613..aace9ecf 100644 --- a/src/server/controlchan/commands/mod.rs +++ b/src/server/controlchan/commands/mod.rs @@ -12,6 +12,7 @@ mod ccc; mod cdup; mod cwd; mod dele; +mod epsv; mod feat; mod help; mod list; @@ -52,6 +53,7 @@ pub use ccc::Ccc; pub use cdup::Cdup; pub use cwd::Cwd; pub use dele::Dele; +pub use epsv::Epsv; pub use feat::Feat; pub use help::Help; pub use list::List; diff --git a/src/server/controlchan/control_loop.rs b/src/server/controlchan/control_loop.rs index d2387d22..d53fc585 100644 --- a/src/server/controlchan/control_loop.rs +++ b/src/server/controlchan/control_loop.rs @@ -442,6 +442,7 @@ where Command::Help => Box::new(commands::Help), Command::Noop => Box::new(commands::Noop), Command::Pasv => Box::new(commands::Pasv::new()), + Command::Epsv => Box::new(commands::Epsv::new()), Command::Port { addr } => Box::new(commands::Port::new(addr)), Command::Retr { .. } => Box::new(commands::Retr), Command::Stor { .. } => Box::new(commands::Stor), diff --git a/src/server/controlchan/line_parser/parser.rs b/src/server/controlchan/line_parser/parser.rs index c80f8467..fc169133 100644 --- a/src/server/controlchan/line_parser/parser.rs +++ b/src/server/controlchan/line_parser/parser.rs @@ -87,6 +87,7 @@ where } Command::Pasv } + "EPSV" => Command::Epsv, "PORT" => { let params = parse_to_eol(cmd_params)?; if params.is_empty() { diff --git a/src/server/controlchan/line_parser/tests.rs b/src/server/controlchan/line_parser/tests.rs index bb38bc90..a59761cd 100644 --- a/src/server/controlchan/line_parser/tests.rs +++ b/src/server/controlchan/line_parser/tests.rs @@ -181,6 +181,12 @@ fn parse_pasv() { assert_eq!(parse(input), Err(ParseError::from(ParseErrorKind::InvalidCommand))); } +#[test] +fn parse_epsv() { + let input = "EPSV\r\n"; + assert_eq!(parse(input).unwrap(), Command::Epsv); +} + #[test] fn parse_port() { let input = "PORT\r\n";