From 4cf24606d55dc61ae1b888d8d86e90a9c70776c0 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Wed, 4 Dec 2024 14:37:46 -0700 Subject: [PATCH 1/4] Add RETR_SOCKETS It's a global that contains all of the sockets currently serving RETR. The application can use it to analyze performance on a per-socket basis. --- Cargo.toml | 2 +- src/lib.rs | 5 ++- src/server/datachan.rs | 80 +++++++++++++++++++++++++++++++++++++++--- src/server/mod.rs | 1 + 4 files changed, 82 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b3cf73bc..3fb8338a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ getrandom = "0.2.15" lazy_static = "1.4.0" md-5 = "0.10.6" moka = { version = "0.12.7", default-features = false, features = ["sync"] } -nix = { version = "0.29.0", default-features = false, features = ["fs"] } +nix = { version = "0.29.0", default-features = false, features = ["fs", "net", "socket"] } prometheus = { version = "0.13.4", default-features = false } proxy-protocol = "0.5.0" rustls = "0.23.10" diff --git a/src/lib.rs b/src/lib.rs index f2b210ab..fa9d8ed0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,9 @@ pub mod notification; pub(crate) mod server; pub mod storage; -pub use crate::server::ftpserver::{error::ServerError, options, Server, ServerBuilder}; +pub use crate::server::{ + ftpserver::{error::ServerError, options, Server, ServerBuilder}, + RETR_SOCKETS, +}; type BoxError = Box; diff --git a/src/server/datachan.rs b/src/server/datachan.rs index 65480916..cc58a043 100644 --- a/src/server/datachan.rs +++ b/src/server/datachan.rs @@ -11,7 +11,15 @@ use crate::{ }; use crate::server::chancomms::DataChanCmd; -use std::{path::PathBuf, sync::Arc}; +use std::{ + net::SocketAddr, + os::fd::{AsRawFd, BorrowedFd, RawFd}, + path::PathBuf, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::net::TcpStream; use tokio::sync::mpsc::{Receiver, Sender}; @@ -42,7 +50,53 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Instant; -struct MeasuringWriter { +/// Holds information about a socket processing a RETR command +#[derive(Debug)] +pub struct RetrSocket { + bytes: AtomicU64, + fd: RawFd, + peer: SocketAddr, +} + +impl RetrSocket { + /// How many bytes have been written to the socket so far? + /// + /// Note that this tracks bytes written to the socket, not sent on the wire. + pub fn bytes(&self) -> u64 { + self.bytes.load(Ordering::Relaxed) + } + + pub fn fd(&self) -> BorrowedFd<'_> { + // Safe because we always destroy the RetrSocket when the MeasuringWriter drops + #[allow(unsafe_code)] + unsafe { + BorrowedFd::borrow_raw(self.fd) + } + } + + fn new(w: &W) -> nix::Result { + let fd = w.as_raw_fd(); + let ss: nix::sys::socket::SockaddrStorage = nix::sys::socket::getpeername(fd)?; + let peer = if let Some(sin) = ss.as_sockaddr_in() { + SocketAddr::V4((*sin).into()) + } else if let Some(sin6) = ss.as_sockaddr_in6() { + SocketAddr::V6((*sin6).into()) + } else { + return Err(nix::errno::Errno::EINVAL); + }; + let bytes = Default::default(); + Ok(RetrSocket { bytes, fd, peer }) + } + + pub fn peer(&self) -> &SocketAddr { + &self.peer + } +} + +/// Collection of all sockets currently serving RETR commands +pub static RETR_SOCKETS: std::sync::RwLock> = std::sync::RwLock::new(std::collections::BTreeMap::new()); + +struct MeasuringWriter { writer: W, command: &'static str, } @@ -52,12 +106,20 @@ struct MeasuringReader { command: &'static str, } -impl AsyncWrite for MeasuringWriter { +impl AsyncWrite for MeasuringWriter { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> std::task::Poll> { let this = self.get_mut(); let result = Pin::new(&mut this.writer).poll_write(cx, buf); if let Poll::Ready(Ok(bytes_written)) = &result { + let bw = *bytes_written as u64; + RETR_SOCKETS + .read() + .unwrap() + .get(&this.writer.as_raw_fd()) + .expect("TODO: better error handling") + .bytes + .fetch_add(bw, Ordering::Relaxed); metrics::inc_sent_bytes(*bytes_written, this.command); } @@ -87,12 +149,22 @@ impl AsyncRead for MeasuringReader { } } -impl MeasuringWriter { +impl MeasuringWriter { fn new(writer: W, command: &'static str) -> MeasuringWriter { + let retr_socket = RetrSocket::new(&writer).expect("TODO: better error handling"); + RETR_SOCKETS.write().unwrap().insert(retr_socket.fd, retr_socket); Self { writer, command } } } +impl Drop for MeasuringWriter { + fn drop(&mut self) { + if let Ok(mut guard) = RETR_SOCKETS.write() { + guard.remove(&self.writer.as_raw_fd()); + } + } +} + impl MeasuringReader { fn new(reader: R, command: &'static str) -> MeasuringReader { Self { reader, command } diff --git a/src/server/mod.rs b/src/server/mod.rs index 8e87d9c0..8c473791 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -17,4 +17,5 @@ pub(crate) use controlchan::reply::{Reply, ReplyCode}; pub(crate) use controlchan::ControlChanMiddleware; pub(crate) use controlchan::Event; pub(crate) use controlchan::{ControlChanError, ControlChanErrorKind}; +pub use datachan::RETR_SOCKETS; use session::{Session, SessionState}; From 101c5d48d6d64fc3333607c5842af01d49f5cf45 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Tue, 10 Dec 2024 08:32:16 -0700 Subject: [PATCH 2/4] Fix the build on Windows by excluding RETR_SOCKETS --- src/server/datachan.rs | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/src/server/datachan.rs b/src/server/datachan.rs index cc58a043..7a5e6b84 100644 --- a/src/server/datachan.rs +++ b/src/server/datachan.rs @@ -13,13 +13,14 @@ use crate::{ use crate::server::chancomms::DataChanCmd; use std::{ net::SocketAddr, - os::fd::{AsRawFd, BorrowedFd, RawFd}, path::PathBuf, sync::{ atomic::{AtomicU64, Ordering}, Arc, }, }; +#[cfg(unix)] +use std::os::fd::{AsRawFd, BorrowedFd, RawFd}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::net::TcpStream; use tokio::sync::mpsc::{Receiver, Sender}; @@ -51,6 +52,7 @@ use std::task::{Context, Poll}; use std::time::Instant; /// Holds information about a socket processing a RETR command +#[cfg(unix)] #[derive(Debug)] pub struct RetrSocket { bytes: AtomicU64, @@ -58,6 +60,7 @@ pub struct RetrSocket { peer: SocketAddr, } +#[cfg(unix)] impl RetrSocket { /// How many bytes have been written to the socket so far? /// @@ -94,6 +97,7 @@ impl RetrSocket { } /// Collection of all sockets currently serving RETR commands +#[cfg(unix)] pub static RETR_SOCKETS: std::sync::RwLock> = std::sync::RwLock::new(std::collections::BTreeMap::new()); struct MeasuringWriter { @@ -113,13 +117,16 @@ impl AsyncWrite for MeasuringWriter { let result = Pin::new(&mut this.writer).poll_write(cx, buf); if let Poll::Ready(Ok(bytes_written)) = &result { let bw = *bytes_written as u64; - RETR_SOCKETS - .read() - .unwrap() - .get(&this.writer.as_raw_fd()) - .expect("TODO: better error handling") - .bytes - .fetch_add(bw, Ordering::Relaxed); + #[cfg(unix)] + { + RETR_SOCKETS + .read() + .unwrap() + .get(&this.writer.as_raw_fd()) + .expect("TODO: better error handling") + .bytes + .fetch_add(bw, Ordering::Relaxed); + } metrics::inc_sent_bytes(*bytes_written, this.command); } @@ -151,12 +158,16 @@ impl AsyncRead for MeasuringReader { impl MeasuringWriter { fn new(writer: W, command: &'static str) -> MeasuringWriter { - let retr_socket = RetrSocket::new(&writer).expect("TODO: better error handling"); - RETR_SOCKETS.write().unwrap().insert(retr_socket.fd, retr_socket); + #[cfg(unix)] + { + let retr_socket = RetrSocket::new(&writer).expect("TODO: better error handling"); + RETR_SOCKETS.write().unwrap().insert(retr_socket.fd, retr_socket); + } Self { writer, command } } } +#[cfg(unix)] impl Drop for MeasuringWriter { fn drop(&mut self) { if let Ok(mut guard) = RETR_SOCKETS.write() { From 0fd4222323d5d3b05b19cd8bb011b3d126cb920b Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Tue, 10 Dec 2024 08:46:11 -0700 Subject: [PATCH 3/4] try 2 to fix the build on windows --- src/server/datachan.rs | 61 +++++++++++++++++++++++++++++++----------- src/server/mod.rs | 1 + 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/server/datachan.rs b/src/server/datachan.rs index 7a5e6b84..d440f600 100644 --- a/src/server/datachan.rs +++ b/src/server/datachan.rs @@ -12,15 +12,15 @@ use crate::{ use crate::server::chancomms::DataChanCmd; use std::{ - net::SocketAddr, path::PathBuf, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, + sync::Arc, }; #[cfg(unix)] -use std::os::fd::{AsRawFd, BorrowedFd, RawFd}; +use std::{ + net::SocketAddr, + os::fd::{AsRawFd, BorrowedFd, RawFd}, + sync::atomic::{AtomicU64, Ordering}, +}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::net::TcpStream; use tokio::sync::mpsc::{Receiver, Sender}; @@ -100,16 +100,23 @@ impl RetrSocket { #[cfg(unix)] pub static RETR_SOCKETS: std::sync::RwLock> = std::sync::RwLock::new(std::collections::BTreeMap::new()); +#[cfg(unix)] struct MeasuringWriter { writer: W, command: &'static str, } +#[cfg(not(unix))] +struct MeasuringWriter { + writer: W, + command: &'static str, +} struct MeasuringReader { reader: R, command: &'static str, } +#[cfg(unix)] impl AsyncWrite for MeasuringWriter { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> std::task::Poll> { let this = self.get_mut(); @@ -117,16 +124,38 @@ impl AsyncWrite for MeasuringWriter { let result = Pin::new(&mut this.writer).poll_write(cx, buf); if let Poll::Ready(Ok(bytes_written)) = &result { let bw = *bytes_written as u64; - #[cfg(unix)] - { - RETR_SOCKETS - .read() - .unwrap() - .get(&this.writer.as_raw_fd()) - .expect("TODO: better error handling") - .bytes - .fetch_add(bw, Ordering::Relaxed); - } + RETR_SOCKETS + .read() + .unwrap() + .get(&this.writer.as_raw_fd()) + .expect("TODO: better error handling") + .bytes + .fetch_add(bw, Ordering::Relaxed); + metrics::inc_sent_bytes(*bytes_written, this.command); + } + + result + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.writer).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.writer).poll_shutdown(cx) + } +} + +#[cfg(not(unix))] +impl AsyncWrite for MeasuringWriter { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> std::task::Poll> { + let this = self.get_mut(); + + let result = Pin::new(&mut this.writer).poll_write(cx, buf); + if let Poll::Ready(Ok(bytes_written)) = &result { + let bw = *bytes_written as u64; metrics::inc_sent_bytes(*bytes_written, this.command); } diff --git a/src/server/mod.rs b/src/server/mod.rs index 8c473791..2892f2ee 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -17,5 +17,6 @@ pub(crate) use controlchan::reply::{Reply, ReplyCode}; pub(crate) use controlchan::ControlChanMiddleware; pub(crate) use controlchan::Event; pub(crate) use controlchan::{ControlChanError, ControlChanErrorKind}; +#[cfg(unix)] pub use datachan::RETR_SOCKETS; use session::{Session, SessionState}; From be371d7a95c1600d826ffcb4c7dd7b8639341b9f Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Tue, 10 Dec 2024 08:55:11 -0700 Subject: [PATCH 4/4] Try 3 to fix the build on Windows --- src/lib.rs | 7 +++---- src/server/datachan.rs | 21 +++++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index fa9d8ed0..66da847b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,9 +49,8 @@ pub mod notification; pub(crate) mod server; pub mod storage; -pub use crate::server::{ - ftpserver::{error::ServerError, options, Server, ServerBuilder}, - RETR_SOCKETS, -}; +pub use crate::server::ftpserver::{error::ServerError, options, Server, ServerBuilder}; +#[cfg(unix)] +pub use crate::server::RETR_SOCKETS; type BoxError = Box; diff --git a/src/server/datachan.rs b/src/server/datachan.rs index d440f600..859d4fff 100644 --- a/src/server/datachan.rs +++ b/src/server/datachan.rs @@ -11,16 +11,13 @@ use crate::{ }; use crate::server::chancomms::DataChanCmd; -use std::{ - path::PathBuf, - sync::Arc, -}; #[cfg(unix)] use std::{ net::SocketAddr, os::fd::{AsRawFd, BorrowedFd, RawFd}, sync::atomic::{AtomicU64, Ordering}, }; +use std::{path::PathBuf, sync::Arc}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf}; use tokio::net::TcpStream; use tokio::sync::mpsc::{Receiver, Sender}; @@ -106,7 +103,7 @@ struct MeasuringWriter { command: &'static str, } #[cfg(not(unix))] -struct MeasuringWriter { +struct MeasuringWriter { writer: W, command: &'static str, } @@ -185,13 +182,17 @@ impl AsyncRead for MeasuringReader { } } +#[cfg(unix)] impl MeasuringWriter { fn new(writer: W, command: &'static str) -> MeasuringWriter { - #[cfg(unix)] - { - let retr_socket = RetrSocket::new(&writer).expect("TODO: better error handling"); - RETR_SOCKETS.write().unwrap().insert(retr_socket.fd, retr_socket); - } + let retr_socket = RetrSocket::new(&writer).expect("TODO: better error handling"); + RETR_SOCKETS.write().unwrap().insert(retr_socket.fd, retr_socket); + Self { writer, command } + } +} +#[cfg(not(unix))] +impl MeasuringWriter { + fn new(writer: W, command: &'static str) -> MeasuringWriter { Self { writer, command } } }