From 4196cfc96abec9ca68cb36ae8d4cf8a1bcb9ff06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Tue, 13 Dec 2022 14:40:23 +0100 Subject: [PATCH 1/7] Never block route change callback when registering IPs with ST driver --- talpid-core/src/split_tunnel/windows/mod.rs | 146 +++++++++++++------- 1 file changed, 94 insertions(+), 52 deletions(-) diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs index 68030691a99b..74c15fd13e32 100644 --- a/talpid-core/src/split_tunnel/windows/mod.rs +++ b/talpid-core/src/split_tunnel/windows/mod.rs @@ -110,7 +110,6 @@ pub struct SplitTunnel { quit_event: Arc, excluded_processes: Arc>>, _route_change_callback: Option, - daemon_tx: Weak>, async_path_update_in_progress: Arc, route_manager: RouteManagerHandle, } @@ -121,10 +120,20 @@ enum Request { Stop, } type RequestResponseTx = sync_mpsc::Sender>; -type RequestTx = sync_mpsc::Sender<(Request, RequestResponseTx)>; +type RequestTx = sync_mpsc::Sender<(Request, Option)>; const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); +impl Request { + fn request_name(&self) -> &'static str { + match self { + Request::SetPaths(_) => "SetPaths", + Request::RegisterIps(_) => "RegisterIps", + Request::Stop => "Stop", + } + } +} + #[derive(Default, PartialEq, Clone)] struct InterfaceAddresses { tunnel_ipv4: Option, @@ -170,8 +179,12 @@ impl SplitTunnel { ) -> Result { let excluded_processes = Arc::new(RwLock::new(HashMap::new())); - let (request_tx, handle) = - Self::spawn_request_thread(resource_dir, volume_update_rx, excluded_processes.clone())?; + let (request_tx, handle) = Self::spawn_request_thread( + resource_dir, + daemon_tx, + volume_update_rx, + excluded_processes.clone(), + )?; let (event_thread, quit_event) = Self::spawn_event_listener(handle, excluded_processes.clone())?; @@ -182,7 +195,6 @@ impl SplitTunnel { event_thread: Some(event_thread), quit_event, _route_change_callback: None, - daemon_tx, async_path_update_in_progress: Arc::new(AtomicBool::new(false)), excluded_processes, route_manager, @@ -388,6 +400,7 @@ impl SplitTunnel { fn spawn_request_thread( resource_dir: PathBuf, + daemon_tx: Weak>, volume_update_rx: mpsc::UnboundedReceiver<()>, excluded_processes: Arc>>, ) -> Result<(RequestTx, Arc), Error> { @@ -430,6 +443,7 @@ impl SplitTunnel { let mut previous_addresses = InterfaceAddresses::default(); while let Ok((request, response_tx)) = rx.recv() { + let request_name = request.request_name(); let response = match request { Request::SetPaths(paths) => { let mut monitored_paths_guard = monitored_paths.lock().unwrap(); @@ -476,21 +490,67 @@ impl SplitTunnel { } Request::Stop => { if let Err(error) = handle.reset().map_err(Error::ResetError) { - let _ = response_tx.send(Err(error)); + if let Some(response_tx) = response_tx { + let _ = response_tx.send(Err(error)); + } else { + log::error!( + "{}", + error.display_chain_with_msg( + "Failed to deinitialize split tunneling" + ) + ); + } continue; } monitored_paths.lock().unwrap().clear(); excluded_processes.write().unwrap().clear(); - let _ = response_tx.send(Ok(())); + if let Some(response_tx) = response_tx { + let _ = response_tx.send(Ok(())); + } // Stop listening to commands break; } }; - if response_tx.send(response).is_err() { - log::error!("A response could not be sent for a completed request"); + + // Handle IOCTL result + + let log_response = if let Some(response_tx) = response_tx { + if let Err(error) = response_tx.send(response) { + log::error!( + "A response could not be sent for completed request/ioctl: {}", + request_name + ); + Some(error.0) + } else { + None + } + } else { + if response.is_err() { + if let Some(daemon_tx) = daemon_tx.upgrade() { + log::debug!( + "Entering error state due to failed request/ioctl: {}", + request_name + ); + let _ = daemon_tx.unbounded_send(TunnelCommand::Block( + ErrorStateCause::SplitTunnelError, + )); + } else { + log::error!( + "Cannot handle failed request since tunnel state machine is down" + ); + } + } + Some(response) + }; + if let Some(Err(error)) = log_response { + log::error!( + "Request/ioctl failed: {}\n{}", + request_name, + error.display_chain() + ); } } @@ -548,7 +608,7 @@ impl SplitTunnel { let (response_tx, response_rx) = sync_mpsc::channel(); request_tx - .send((request, response_tx)) + .send((request, Some(response_tx))) .map_err(|_| Error::SplitTunnelDown)?; response_rx @@ -590,7 +650,7 @@ impl SplitTunnel { let wait_task = move || { request_tx - .send((request, response_tx)) + .send((request, Some(response_tx))) .map_err(|_| Error::SplitTunnelDown)?; response_rx.recv().map_err(|_| Error::SplitTunnelDown)? }; @@ -620,7 +680,6 @@ impl SplitTunnel { let context_mutex = Arc::new(Mutex::new( SplitTunnelDefaultRouteChangeHandlerContext::new( self.request_tx.clone(), - self.daemon_tx.clone(), tunnel_ipv4, tunnel_ipv6, ), @@ -646,7 +705,7 @@ impl SplitTunnel { // could deadlock if the dropped callback is invoked (see `init_context`). .map_err(|_| Error::RegisterRouteChangeCallback)?; - Self::init_context(context)?; + Self::init_context(context, &self.request_tx)?; self._route_change_callback = callback; Ok(()) @@ -654,6 +713,7 @@ impl SplitTunnel { fn init_context( mut context: MutexGuard<'_, SplitTunnelDefaultRouteChangeHandlerContext>, + request_tx: &RequestTx, ) -> Result<(), Error> { // NOTE: This should remain a separate function. Dropping the context after `callback` // causes a deadlock if `split_tunnel_default_route_change_handler` is called at the same @@ -662,7 +722,7 @@ impl SplitTunnel { // to complete. context.initialize_internet_addresses()?; - context.register_ips() + SplitTunnel::send_request_inner(request_tx, Request::RegisterIps(context.addresses.clone())) } /// Instructs the driver to stop redirecting tunnel traffic and INADDR_ANY. @@ -702,20 +762,17 @@ impl Drop for SplitTunnel { struct SplitTunnelDefaultRouteChangeHandlerContext { request_tx: RequestTx, - pub daemon_tx: Weak>, pub addresses: InterfaceAddresses, } impl SplitTunnelDefaultRouteChangeHandlerContext { pub fn new( request_tx: RequestTx, - daemon_tx: Weak>, tunnel_ipv4: Option, tunnel_ipv6: Option, ) -> Self { SplitTunnelDefaultRouteChangeHandlerContext { request_tx, - daemon_tx, addresses: InterfaceAddresses { tunnel_ipv4, tunnel_ipv6, @@ -725,13 +782,6 @@ impl SplitTunnelDefaultRouteChangeHandlerContext { } } - pub fn register_ips(&self) -> Result<(), Error> { - SplitTunnel::send_request_inner( - &self.request_tx, - Request::RegisterIps(self.addresses.clone()), - ) - } - pub fn initialize_internet_addresses(&mut self) -> Result<(), Error> { // Identify IP address that gives us Internet access let internet_ipv4 = get_best_default_route(AddressFamily::Ipv4) @@ -782,14 +832,9 @@ fn split_tunnel_default_route_change_handler( // Update the "internet interface" IP when best default route changes let mut ctx = ctx_mutex.lock().expect("ST route handler mutex poisoned"); - let daemon_tx = ctx.daemon_tx.upgrade(); - let maybe_send = move |content| { - if let Some(tx) = daemon_tx { - let _ = tx.unbounded_send(content); - } - }; + let prev_addrs = ctx.addresses.clone(); - let result = match event_type { + match event_type { Updated(default_route) | UpdatedDetails(default_route) => { match get_ip_address_for_interface(address_family, default_route.iface) { Ok(Some(ip)) => match ip { @@ -814,32 +859,29 @@ fn split_tunnel_default_route_change_handler( "Failed to obtain default route interface address" ) ); - maybe_send(TunnelCommand::Block(ErrorStateCause::SplitTunnelError)); - return; } }; - - ctx.register_ips() } // no default route - Removed => { - match address_family { - AddressFamily::Ipv4 => { - ctx.addresses.internet_ipv4 = None; - } - AddressFamily::Ipv6 => { - ctx.addresses.internet_ipv6 = None; - } + Removed => match address_family { + AddressFamily::Ipv4 => { + ctx.addresses.internet_ipv4 = None; } - ctx.register_ips() - } - }; + AddressFamily::Ipv6 => { + ctx.addresses.internet_ipv6 = None; + } + }, + } - if let Err(error) = result { - log::error!( - "{}", - error.display_chain_with_msg("Failed to register new addresses in split tunnel driver") - ); - maybe_send(TunnelCommand::Block(ErrorStateCause::SplitTunnelError)); + if prev_addrs == ctx.addresses { + return; + } + + if ctx + .request_tx + .send((Request::RegisterIps(ctx.addresses.clone()), None)) + .is_err() + { + log::error!("Split tunnel request thread is down"); } } From 836f3927204e1363102dd158106c809d98b2e52b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Tue, 13 Dec 2022 22:44:02 +0100 Subject: [PATCH 2/7] Connections are redirected, not 'traffic' --- talpid-core/src/split_tunnel/windows/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs index 74c15fd13e32..ee6659bf21a0 100644 --- a/talpid-core/src/split_tunnel/windows/mod.rs +++ b/talpid-core/src/split_tunnel/windows/mod.rs @@ -661,7 +661,7 @@ impl SplitTunnel { }); } - /// Instructs the driver to redirect traffic from sockets bound to 0.0.0.0, ::, or the + /// Instructs the driver to redirect connections for sockets bound to 0.0.0.0, ::, or the /// tunnel addresses (if any) to the default route. pub fn set_tunnel_addresses(&mut self, metadata: Option<&TunnelMetadata>) -> Result<(), Error> { let mut tunnel_ipv4 = None; @@ -725,7 +725,7 @@ impl SplitTunnel { SplitTunnel::send_request_inner(request_tx, Request::RegisterIps(context.addresses.clone())) } - /// Instructs the driver to stop redirecting tunnel traffic and INADDR_ANY. + /// Instructs the driver to stop redirecting connections. pub fn clear_tunnel_addresses(&mut self) -> Result<(), Error> { self._route_change_callback = None; self.send_request(Request::RegisterIps(InterfaceAddresses::default())) From 2c0def32519c080ab9944c041ea22dff16da6d20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Mon, 2 Sep 2024 15:51:58 +0200 Subject: [PATCH 3/7] Break split tunnel monitor into more functions --- talpid-core/src/split_tunnel/windows/mod.rs | 280 +++++++++++--------- 1 file changed, 159 insertions(+), 121 deletions(-) diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs index ee6659bf21a0..e91f96bc773f 100644 --- a/talpid-core/src/split_tunnel/windows/mod.rs +++ b/talpid-core/src/split_tunnel/windows/mod.rs @@ -5,7 +5,9 @@ mod volume_monitor; mod windows; use crate::{tunnel::TunnelMetadata, tunnel_state_machine::TunnelCommand}; +use driver::DeviceHandle; use futures::channel::{mpsc, oneshot}; +use path_monitor::PathMonitorHandle; use std::{ collections::HashMap, ffi::{OsStr, OsString}, @@ -422,14 +424,8 @@ impl SplitTunnel { ); std::thread::spawn(move || { - let init_fn = || { - service::install_driver_if_required(&resource_dir).map_err(Error::ServiceError)?; - driver::DeviceHandle::new() - .map(Arc::new) - .map_err(Error::InitializationError) - }; - - let handle = match init_fn() { + // Ensure that the device driver service is running and that we have a handle to it + let handle = match Self::setup_and_create_device(&resource_dir) { Ok(handle) => { let _ = init_tx.send(Ok(handle.clone())); handle @@ -440,120 +436,16 @@ impl SplitTunnel { } }; - let mut previous_addresses = InterfaceAddresses::default(); - - while let Ok((request, response_tx)) = rx.recv() { - let request_name = request.request_name(); - let response = match request { - Request::SetPaths(paths) => { - let mut monitored_paths_guard = monitored_paths.lock().unwrap(); - - let result = if !paths.is_empty() { - handle.set_config(&paths).map_err(Error::SetConfiguration) - } else { - handle.clear_config().map_err(Error::SetConfiguration) - }; - - if result.is_ok() { - if let Err(error) = path_monitor.set_paths(&paths) { - log::error!( - "{}", - error.display_chain_with_msg("Failed to update path monitor") - ); - } - *monitored_paths_guard = paths.to_vec(); - } - - result - } - Request::RegisterIps(mut ips) => { - if ips.internet_ipv4.is_none() && ips.internet_ipv6.is_none() { - ips.tunnel_ipv4 = None; - ips.tunnel_ipv6 = None; - } - if previous_addresses == ips { - Ok(()) - } else { - let result = handle - .register_ips( - ips.tunnel_ipv4, - ips.tunnel_ipv6, - ips.internet_ipv4, - ips.internet_ipv6, - ) - .map_err(Error::RegisterIps); - if result.is_ok() { - previous_addresses = ips; - } - result - } - } - Request::Stop => { - if let Err(error) = handle.reset().map_err(Error::ResetError) { - if let Some(response_tx) = response_tx { - let _ = response_tx.send(Err(error)); - } else { - log::error!( - "{}", - error.display_chain_with_msg( - "Failed to deinitialize split tunneling" - ) - ); - } - continue; - } - - monitored_paths.lock().unwrap().clear(); - excluded_processes.write().unwrap().clear(); - - if let Some(response_tx) = response_tx { - let _ = response_tx.send(Ok(())); - } - - // Stop listening to commands - break; - } - }; - - // Handle IOCTL result - - let log_response = if let Some(response_tx) = response_tx { - if let Err(error) = response_tx.send(response) { - log::error!( - "A response could not be sent for completed request/ioctl: {}", - request_name - ); - Some(error.0) - } else { - None - } - } else { - if response.is_err() { - if let Some(daemon_tx) = daemon_tx.upgrade() { - log::debug!( - "Entering error state due to failed request/ioctl: {}", - request_name - ); - let _ = daemon_tx.unbounded_send(TunnelCommand::Block( - ErrorStateCause::SplitTunnelError, - )); - } else { - log::error!( - "Cannot handle failed request since tunnel state machine is down" - ); - } - } - Some(response) - }; - if let Some(Err(error)) = log_response { - log::error!( - "Request/ioctl failed: {}\n{}", - request_name, - error.display_chain() - ); - } - } + Self::request_loop( + handle.clone(), + rx, + daemon_tx, + monitored_paths, + path_monitor.clone(), + excluded_processes, + ); + // Shut components down in a sane order drop(volume_monitor); if let Err(error) = path_monitor.shutdown() { log::error!( @@ -562,6 +454,8 @@ impl SplitTunnel { ); } + // The device handle must be dropped before stopping the service + debug_assert_eq!(Arc::strong_count(&handle), 1); drop(handle); log::debug!("Stopping ST service"); @@ -600,6 +494,150 @@ impl SplitTunnel { Ok((tx, handle)) } + /// Install the driver and create a device for it + fn setup_and_create_device(resource_dir: &Path) -> Result, Error> { + service::install_driver_if_required(resource_dir).map_err(Error::ServiceError)?; + driver::DeviceHandle::new() + .map(Arc::new) + .map_err(Error::InitializationError) + } + + fn request_loop( + handle: Arc, + cmd_rx: sync_mpsc::Receiver<(Request, Option)>, + daemon_tx: Weak>, + monitored_paths: Arc>>, + path_monitor: PathMonitorHandle, + excluded_processes: Arc>>, + ) { + let mut previous_addresses = InterfaceAddresses::default(); + + while let Ok((request, response_tx)) = cmd_rx.recv() { + let request_name = request.request_name(); + let (should_stop, response) = Self::handle_request( + request, + &handle, + &path_monitor, + &monitored_paths, + &excluded_processes, + &mut previous_addresses, + ); + + // Handle request result + let log_response = if let Some(response_tx) = response_tx { + if let Err(error) = response_tx.send(response) { + log::error!( + "A response could not be sent for completed request/ioctl: {}", + request_name + ); + Some(error.0) + } else { + None + } + } else { + if response.is_err() { + if let Some(daemon_tx) = daemon_tx.upgrade() { + log::debug!( + "Entering error state due to failed request/ioctl: {}", + request_name + ); + let _ = daemon_tx.unbounded_send(TunnelCommand::Block( + ErrorStateCause::SplitTunnelError, + )); + } else { + log::error!( + "Cannot handle failed request since tunnel state machine is down" + ); + } + } + Some(response) + }; + if let Some(Err(error)) = log_response { + log::error!( + "Request/ioctl failed: {}\n{}", + request_name, + error.display_chain() + ); + } + + // Stop request loop + if should_stop { + break; + } + } + } + + /// Handle a request to the split tunnel device + fn handle_request( + request: Request, + handle: &DeviceHandle, + path_monitor: &path_monitor::PathMonitorHandle, + monitored_paths: &Arc>>, + excluded_processes: &Arc>>, + previous_addresses: &mut InterfaceAddresses, + ) -> (bool, Result<(), Error>) { + let (should_stop, result) = match request { + Request::SetPaths(paths) => { + let mut monitored_paths_guard = monitored_paths.lock().unwrap(); + + let result = if !paths.is_empty() { + handle.set_config(&paths).map_err(Error::SetConfiguration) + } else { + handle.clear_config().map_err(Error::SetConfiguration) + }; + + if result.is_ok() { + if let Err(error) = path_monitor.set_paths(&paths) { + log::error!( + "{}", + error.display_chain_with_msg("Failed to update path monitor") + ); + } + *monitored_paths_guard = paths.to_vec(); + } + + (false, result) + } + Request::RegisterIps(mut ips) => { + if ips.internet_ipv4.is_none() && ips.internet_ipv6.is_none() { + ips.tunnel_ipv4 = None; + ips.tunnel_ipv6 = None; + } + if previous_addresses == &ips { + (false, Ok(())) + } else { + let result = handle + .register_ips( + ips.tunnel_ipv4, + ips.tunnel_ipv6, + ips.internet_ipv4, + ips.internet_ipv6, + ) + .map_err(Error::RegisterIps); + if result.is_ok() { + *previous_addresses = ips; + } + (false, result) + } + } + Request::Stop => { + if let Err(error) = handle.reset().map_err(Error::ResetError) { + // Shut down failed, so continue to live + return (false, Err(error)); + } + + // Clean up + monitored_paths.lock().unwrap().clear(); + excluded_processes.write().unwrap().clear(); + + // Stop listening to commands + (true, Ok(())) + } + }; + + (should_stop, result) + } + fn send_request(&self, request: Request) -> Result<(), Error> { Self::send_request_inner(&self.request_tx, request) } From 6542a7485681e33b7b2d199e6696edb78450e432 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Mon, 2 Sep 2024 16:06:49 +0200 Subject: [PATCH 4/7] Encapsulate request and response channel --- talpid-core/src/split_tunnel/windows/mod.rs | 191 +++++++++++++------- 1 file changed, 123 insertions(+), 68 deletions(-) diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs index e91f96bc773f..372b834e318f 100644 --- a/talpid-core/src/split_tunnel/windows/mod.rs +++ b/talpid-core/src/split_tunnel/windows/mod.rs @@ -107,7 +107,7 @@ pub enum Error { /// Manages applications whose traffic to exclude from the tunnel. pub struct SplitTunnel { runtime: tokio::runtime::Handle, - request_tx: RequestTx, + request_tx: sync_mpsc::Sender, event_thread: Option>, quit_event: Arc, excluded_processes: Arc>>, @@ -116,26 +116,55 @@ pub struct SplitTunnel { route_manager: RouteManagerHandle, } -enum Request { +/// A request to the split tunnel monitor +struct Request { + /// Request details + details: RequestDetails, + /// Whether to block if the request fails + must_succeed: bool, + /// Response channel + response_tx: Option>>, +} + +enum RequestDetails { + /// Update paths to exclude SetPaths(Vec), + /// Update default and VPN tunnel addresses RegisterIps(InterfaceAddresses), + /// Stop the split tunnel monitor Stop, } -type RequestResponseTx = sync_mpsc::Sender>; -type RequestTx = sync_mpsc::Sender<(Request, Option)>; - -const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); impl Request { + fn new(details: RequestDetails) -> Self { + Request { + details, + must_succeed: false, + response_tx: None, + } + } + + fn response_tx(mut self, response_tx: sync_mpsc::Sender>) -> Self { + self.response_tx = Some(response_tx); + self + } + + fn must_succeed(mut self) -> Self { + self.must_succeed = true; + self + } + fn request_name(&self) -> &'static str { - match self { - Request::SetPaths(_) => "SetPaths", - Request::RegisterIps(_) => "RegisterIps", - Request::Stop => "Stop", + match self.details { + RequestDetails::SetPaths(_) => "SetPaths", + RequestDetails::RegisterIps(_) => "RegisterIps", + RequestDetails::Stop => "Stop", } } } +const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); + #[derive(Default, PartialEq, Clone)] struct InterfaceAddresses { tunnel_ipv4: Option, @@ -405,8 +434,8 @@ impl SplitTunnel { daemon_tx: Weak>, volume_update_rx: mpsc::UnboundedReceiver<()>, excluded_processes: Arc>>, - ) -> Result<(RequestTx, Arc), Error> { - let (tx, rx): (RequestTx, _) = sync_mpsc::channel(); + ) -> Result<(sync_mpsc::Sender, Arc), Error> { + let (tx, rx): (sync_mpsc::Sender, _) = sync_mpsc::channel(); let (init_tx, init_rx) = sync_mpsc::channel(); let monitored_paths = Arc::new(Mutex::new(vec![])); @@ -502,9 +531,10 @@ impl SplitTunnel { .map_err(Error::InitializationError) } + /// Service requests to the device driver fn request_loop( handle: Arc, - cmd_rx: sync_mpsc::Receiver<(Request, Option)>, + cmd_rx: sync_mpsc::Receiver, daemon_tx: Weak>, monitored_paths: Arc>>, path_monitor: PathMonitorHandle, @@ -512,10 +542,11 @@ impl SplitTunnel { ) { let mut previous_addresses = InterfaceAddresses::default(); - while let Ok((request, response_tx)) = cmd_rx.recv() { + while let Ok(request) = cmd_rx.recv() { let request_name = request.request_name(); + let (should_stop, response) = Self::handle_request( - request, + request.details, &handle, &path_monitor, &monitored_paths, @@ -523,42 +554,13 @@ impl SplitTunnel { &mut previous_addresses, ); - // Handle request result - let log_response = if let Some(response_tx) = response_tx { - if let Err(error) = response_tx.send(response) { - log::error!( - "A response could not be sent for completed request/ioctl: {}", - request_name - ); - Some(error.0) - } else { - None - } - } else { - if response.is_err() { - if let Some(daemon_tx) = daemon_tx.upgrade() { - log::debug!( - "Entering error state due to failed request/ioctl: {}", - request_name - ); - let _ = daemon_tx.unbounded_send(TunnelCommand::Block( - ErrorStateCause::SplitTunnelError, - )); - } else { - log::error!( - "Cannot handle failed request since tunnel state machine is down" - ); - } - } - Some(response) - }; - if let Some(Err(error)) = log_response { - log::error!( - "Request/ioctl failed: {}\n{}", - request_name, - error.display_chain() - ); - } + Self::handle_request_result( + &daemon_tx, + response, + request.must_succeed, + request_name, + request.response_tx, + ); // Stop request loop if should_stop { @@ -569,7 +571,7 @@ impl SplitTunnel { /// Handle a request to the split tunnel device fn handle_request( - request: Request, + request: RequestDetails, handle: &DeviceHandle, path_monitor: &path_monitor::PathMonitorHandle, monitored_paths: &Arc>>, @@ -577,7 +579,7 @@ impl SplitTunnel { previous_addresses: &mut InterfaceAddresses, ) -> (bool, Result<(), Error>) { let (should_stop, result) = match request { - Request::SetPaths(paths) => { + RequestDetails::SetPaths(paths) => { let mut monitored_paths_guard = monitored_paths.lock().unwrap(); let result = if !paths.is_empty() { @@ -598,7 +600,7 @@ impl SplitTunnel { (false, result) } - Request::RegisterIps(mut ips) => { + RequestDetails::RegisterIps(mut ips) => { if ips.internet_ipv4.is_none() && ips.internet_ipv6.is_none() { ips.tunnel_ipv4 = None; ips.tunnel_ipv6 = None; @@ -620,7 +622,7 @@ impl SplitTunnel { (false, result) } } - Request::Stop => { + RequestDetails::Stop => { if let Err(error) = handle.reset().map_err(Error::ResetError) { // Shut down failed, so continue to live return (false, Err(error)); @@ -638,15 +640,65 @@ impl SplitTunnel { (should_stop, result) } - fn send_request(&self, request: Request) -> Result<(), Error> { + /// Handle the result of a request + fn handle_request_result( + daemon_tx: &Weak>, + result: Result<(), Error>, + must_succeed: bool, + request_name: &str, + response_tx: Option>>, + ) { + let log_request_failure = |response: &Result<(), Error>| { + if let Err(error) = response { + log::error!( + "Request/ioctl failed: {}\n{}", + request_name, + error.display_chain() + ); + } + }; + + let request_failed = result.is_err(); + + if let Some(response_tx) = response_tx { + if let Err(error) = response_tx.send(result) { + log::error!( + "A response could not be sent for completed request/ioctl: {}", + request_name + ); + log_request_failure(&error.0); + } + } else { + log_request_failure(&result); + } + + // Move to error state if the request failed but must succeed + if request_failed && must_succeed { + if let Some(daemon_tx) = daemon_tx.upgrade() { + log::debug!( + "Entering error state due to failed request/ioctl: {}", + request_name + ); + let _ = daemon_tx + .unbounded_send(TunnelCommand::Block(ErrorStateCause::SplitTunnelError)); + } else { + log::error!("Cannot handle failed request since tunnel state machine is down"); + } + } + } + + fn send_request(&self, request: RequestDetails) -> Result<(), Error> { Self::send_request_inner(&self.request_tx, request) } - fn send_request_inner(request_tx: &RequestTx, request: Request) -> Result<(), Error> { + fn send_request_inner( + request_tx: &sync_mpsc::Sender, + request: RequestDetails, + ) -> Result<(), Error> { let (response_tx, response_rx) = sync_mpsc::channel(); request_tx - .send((request, Some(response_tx))) + .send(Request::new(request).response_tx(response_tx)) .map_err(|_| Error::SplitTunnelDown)?; response_rx @@ -656,7 +708,7 @@ impl SplitTunnel { /// Set a list of applications to exclude from the tunnel. pub fn set_paths_sync>(&self, paths: &[T]) -> Result<(), Error> { - self.send_request(Request::SetPaths( + self.send_request(RequestDetails::SetPaths( paths .iter() .map(|path| path.as_ref().to_os_string()) @@ -678,7 +730,7 @@ impl SplitTunnel { return; } let (response_tx, response_rx) = sync_mpsc::channel(); - let request = Request::SetPaths( + let request = RequestDetails::SetPaths( paths .iter() .map(|path| path.as_ref().to_os_string()) @@ -688,7 +740,7 @@ impl SplitTunnel { let wait_task = move || { request_tx - .send((request, Some(response_tx))) + .send(Request::new(request).response_tx(response_tx)) .map_err(|_| Error::SplitTunnelDown)?; response_rx.recv().map_err(|_| Error::SplitTunnelDown)? }; @@ -751,7 +803,7 @@ impl SplitTunnel { fn init_context( mut context: MutexGuard<'_, SplitTunnelDefaultRouteChangeHandlerContext>, - request_tx: &RequestTx, + request_tx: &sync_mpsc::Sender, ) -> Result<(), Error> { // NOTE: This should remain a separate function. Dropping the context after `callback` // causes a deadlock if `split_tunnel_default_route_change_handler` is called at the same @@ -760,13 +812,16 @@ impl SplitTunnel { // to complete. context.initialize_internet_addresses()?; - SplitTunnel::send_request_inner(request_tx, Request::RegisterIps(context.addresses.clone())) + SplitTunnel::send_request_inner( + request_tx, + RequestDetails::RegisterIps(context.addresses.clone()), + ) } /// Instructs the driver to stop redirecting connections. pub fn clear_tunnel_addresses(&mut self) -> Result<(), Error> { self._route_change_callback = None; - self.send_request(Request::RegisterIps(InterfaceAddresses::default())) + self.send_request(RequestDetails::RegisterIps(InterfaceAddresses::default())) } /// Returns a handle used for interacting with the split tunnel module. @@ -789,7 +844,7 @@ impl Drop for SplitTunnel { // Not joining `event_thread`: It may be unresponsive. } - if let Err(error) = self.send_request(Request::Stop) { + if let Err(error) = self.send_request(RequestDetails::Stop) { log::error!( "{}", error.display_chain_with_msg("Failed to stop ST driver service") @@ -799,13 +854,13 @@ impl Drop for SplitTunnel { } struct SplitTunnelDefaultRouteChangeHandlerContext { - request_tx: RequestTx, + request_tx: sync_mpsc::Sender, pub addresses: InterfaceAddresses, } impl SplitTunnelDefaultRouteChangeHandlerContext { pub fn new( - request_tx: RequestTx, + request_tx: sync_mpsc::Sender, tunnel_ipv4: Option, tunnel_ipv6: Option, ) -> Self { @@ -917,7 +972,7 @@ fn split_tunnel_default_route_change_handler( if ctx .request_tx - .send((Request::RegisterIps(ctx.addresses.clone()), None)) + .send(Request::new(RequestDetails::RegisterIps(ctx.addresses.clone())).must_succeed()) .is_err() { log::error!("Split tunnel request thread is down"); From 6bf2e525a0d151bcfba077cb0761b97ff9b6b2f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Mon, 2 Sep 2024 16:47:22 +0200 Subject: [PATCH 5/7] Move event handling out of split tunnel module --- talpid-core/src/split_tunnel/windows/event.rs | 213 ++++++++++++++++ talpid-core/src/split_tunnel/windows/mod.rs | 228 +----------------- 2 files changed, 216 insertions(+), 225 deletions(-) create mode 100644 talpid-core/src/split_tunnel/windows/event.rs diff --git a/talpid-core/src/split_tunnel/windows/event.rs b/talpid-core/src/split_tunnel/windows/event.rs new file mode 100644 index 000000000000..7350a1bb7772 --- /dev/null +++ b/talpid-core/src/split_tunnel/windows/event.rs @@ -0,0 +1,213 @@ +//! Handle events dispatched from win-split-tunnel. +//! +//! It follows a typical inverted-call model, in which we request the next event, and block until +//! such an event has been received, or until a quit event is signaled. + +use super::driver; +use std::{ + collections::HashMap, + io, + path::Path, + sync::{Arc, RwLock}, + time::Duration, +}; +use talpid_types::{split_tunnel::ExcludedProcess, ErrorExt}; +use talpid_windows::{io::Overlapped, sync::Event}; +use windows_sys::Win32::Foundation::ERROR_OPERATION_ABORTED; + +enum EventResult { + /// Result containing the next event. + Event(driver::EventId, driver::EventBody), + /// Quit event was signaled. + Quit, +} + +const DRIVER_EVENT_BUFFER_SIZE: usize = 2048; + +/// Spawns an event loop thread that processes events from the driver service. +pub fn spawn_listener( + handle: Arc, + excluded_processes: Arc>>, +) -> io::Result<(std::thread::JoinHandle<()>, Arc)> { + let mut event_overlapped = Overlapped::new(Some(Event::new(true, false)?))?; + + let quit_event = Arc::new(Event::new(true, false)?); + let quit_event_copy = quit_event.clone(); + + let event_thread = std::thread::spawn(move || { + log::debug!("Starting split tunnel event thread"); + let mut data_buffer = vec![]; + + loop { + // Wait until either the next event is received or the quit event is signaled. + let (event_id, event_body) = match fetch_next_event( + &handle, + &quit_event, + &mut event_overlapped, + &mut data_buffer, + ) { + Ok(EventResult::Event(event_id, event_body)) => (event_id, event_body), + Ok(EventResult::Quit) => break, + Err(error) => { + if error.raw_os_error() == Some(ERROR_OPERATION_ABORTED as i32) { + // The driver will normally abort the request if the driver state + // is reset. Give the driver service some time to recover before + // retrying. + std::thread::sleep(Duration::from_millis(500)); + } + continue; + } + }; + + handle_event(event_id, event_body, &excluded_processes); + } + + log::debug!("Stopping split tunnel event thread"); + }); + + Ok((event_thread, quit_event_copy)) +} + +fn fetch_next_event( + device: &Arc, + quit_event: &Event, + overlapped: &mut Overlapped, + data_buffer: &mut Vec, +) -> io::Result { + if unsafe { driver::wait_for_single_object(quit_event.as_raw(), Some(Duration::ZERO)) }.is_ok() + { + return Ok(EventResult::Quit); + } + + data_buffer.resize(DRIVER_EVENT_BUFFER_SIZE, 0u8); + + unsafe { + driver::device_io_control_buffer_async( + device, + driver::DriverIoctlCode::DequeEvent as u32, + None, + data_buffer.as_mut_ptr(), + u32::try_from(data_buffer.len()).expect("buffer must be smaller than u32"), + overlapped.as_mut_ptr(), + ) + } + .map_err(|error| { + log::error!( + "{}", + error.display_chain_with_msg("DeviceIoControl failed to deque event") + ); + error + })?; + + let event_objects = [ + overlapped.get_event().unwrap().as_raw(), + quit_event.as_raw(), + ]; + + let signaled_object = unsafe { driver::wait_for_multiple_objects(&event_objects[..], false) } + .map_err(|error| { + log::error!( + "{}", + error.display_chain_with_msg("wait_for_multiple_objects failed") + ); + error + })?; + + if signaled_object == quit_event.as_raw() { + // Quit event was signaled + return Ok(EventResult::Quit); + } + + let returned_bytes = driver::get_overlapped_result(device, overlapped).map_err(|error| { + if error.raw_os_error() != Some(ERROR_OPERATION_ABORTED as i32) { + log::error!( + "{}", + error.display_chain_with_msg("get_overlapped_result failed for dequeued event"), + ); + } + error + })?; + + data_buffer + .truncate(usize::try_from(returned_bytes).expect("usize must be no smaller than u32")); + + driver::parse_event_buffer(data_buffer) + .map(|(id, body)| EventResult::Event(id, body)) + .map_err(|error| { + log::error!( + "{}", + error.display_chain_with_msg("Failed to parse ST event buffer") + ); + io::Error::new(io::ErrorKind::Other, "Failed to parse ST event buffer") + }) +} + +fn handle_event( + event_id: driver::EventId, + event_body: driver::EventBody, + excluded_processes: &Arc>>, +) { + use driver::{EventBody, EventId}; + + let event_str = match &event_id { + EventId::StartSplittingProcess | EventId::ErrorStartSplittingProcess => { + "Start splitting process" + } + EventId::StopSplittingProcess | EventId::ErrorStopSplittingProcess => { + "Stop splitting process" + } + EventId::ErrorMessage => "ErrorMessage", + }; + + match event_body { + EventBody::SplittingEvent { + process_id, + reason, + image, + } => { + let mut pids = excluded_processes.write().unwrap(); + match event_id { + EventId::StartSplittingProcess => { + if let Some(prev_entry) = pids.get(&process_id) { + log::error!("PID collision: {process_id} is already in the list of excluded processes. New image: {:?}. Current image: {:?}", image, prev_entry); + } + pids.insert( + process_id, + ExcludedProcess { + pid: u32::try_from(process_id) + .expect("PID should be containable in a DWORD"), + image: Path::new(&image).to_path_buf(), + inherited: reason + .contains(driver::SplittingChangeReason::BY_INHERITANCE), + }, + ); + } + EventId::StopSplittingProcess => { + if pids.remove(&process_id).is_none() { + log::error!("Inconsistent process tree: {process_id} was not found"); + } + } + _ => (), + } + + log::trace!( + "{}:\n\tpid: {}\n\treason: {:?}\n\timage: {:?}", + event_str, + process_id, + reason, + image, + ); + } + EventBody::SplittingError { process_id, image } => { + log::error!( + "FAILED: {}:\n\tpid: {}\n\timage: {:?}", + event_str, + process_id, + image, + ); + } + EventBody::ErrorMessage { status, message } => { + log::error!("NTSTATUS {:#x}: {}", status, message.to_string_lossy()) + } + } +} diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs index 372b834e318f..70087d3ecd6c 100644 --- a/talpid-core/src/split_tunnel/windows/mod.rs +++ b/talpid-core/src/split_tunnel/windows/mod.rs @@ -1,4 +1,5 @@ mod driver; +mod event; mod path_monitor; mod service; mod volume_monitor; @@ -23,13 +24,10 @@ use std::{ use talpid_routing::{get_best_default_route, CallbackHandle, EventType, RouteManagerHandle}; use talpid_types::{split_tunnel::ExcludedProcess, tunnel::ErrorStateCause, ErrorExt}; use talpid_windows::{ - io::Overlapped, net::{get_ip_address_for_interface, AddressFamily}, sync::Event, }; -use windows_sys::Win32::Foundation::ERROR_OPERATION_ABORTED; -const DRIVER_EVENT_BUFFER_SIZE: usize = 2048; const RESERVED_IP_V4: Ipv4Addr = Ipv4Addr::new(192, 0, 2, 123); /// Errors that may occur in [`SplitTunnel`]. @@ -51,18 +49,10 @@ pub enum Error { #[error("Failed to set list of excluded applications")] SetConfiguration(#[source] io::Error), - /// Failed to obtain the current driver state - #[error("Failed to obtain the driver state")] - GetState(#[source] io::Error), - /// Failed to register interface IP addresses #[error("Failed to register IP addresses for exclusions")] RegisterIps(#[source] io::Error), - /// Failed to clear interface IP addresses - #[error("Failed to clear registered IP addresses")] - ClearIps(#[source] io::Error), - /// Failed to set up the driver event loop #[error("Failed to set up the driver event loop")] EventThreadError(#[source] io::Error), @@ -79,10 +69,6 @@ pub enum Error { #[error("Failed to register default route change callback")] RegisterRouteChangeCallback, - /// Unexpected IP parsing error - #[error("Failed to parse IP address")] - IpParseError, - /// The request handling thread is stuck #[error("The ST request thread is stuck")] RequestThreadStuck, @@ -98,10 +84,6 @@ pub enum Error { /// A previous path update has not yet completed #[error("A previous update is not yet complete")] AlreadySettingPaths, - - /// Resetting in the engaged state risks leaking into the tunnel - #[error("Failed to reset driver because it is engaged")] - CannotResetEngaged, } /// Manages applications whose traffic to exclude from the tunnel. @@ -192,13 +174,6 @@ impl SplitTunnelHandle { } } -enum EventResult { - /// Result containing the next event. - Event(driver::EventId, driver::EventBody), - /// Quit event was signaled. - Quit, -} - impl SplitTunnel { /// Initialize the split tunnel device. pub fn new( @@ -217,8 +192,8 @@ impl SplitTunnel { excluded_processes.clone(), )?; - let (event_thread, quit_event) = - Self::spawn_event_listener(handle, excluded_processes.clone())?; + let (event_thread, quit_event) = event::spawn_listener(handle, excluded_processes.clone()) + .map_err(Error::EventThreadError)?; Ok(SplitTunnel { runtime, @@ -232,203 +207,6 @@ impl SplitTunnel { }) } - /// Spawns an event loop thread that processes events from the driver service. - fn spawn_event_listener( - handle: Arc, - excluded_processes: Arc>>, - ) -> Result<(std::thread::JoinHandle<()>, Arc), Error> { - let mut event_overlapped = Overlapped::new(Some( - Event::new(true, false).map_err(Error::EventThreadError)?, - )) - .map_err(Error::EventThreadError)?; - - let quit_event = Arc::new(Event::new(true, false).map_err(Error::EventThreadError)?); - let quit_event_copy = quit_event.clone(); - - let event_thread = std::thread::spawn(move || { - log::debug!("Starting split tunnel event thread"); - let mut data_buffer = vec![]; - - loop { - // Wait until either the next event is received or the quit event is signaled. - let (event_id, event_body) = match Self::fetch_next_event( - &handle, - &quit_event, - &mut event_overlapped, - &mut data_buffer, - ) { - Ok(EventResult::Event(event_id, event_body)) => (event_id, event_body), - Ok(EventResult::Quit) => break, - Err(error) => { - if error.raw_os_error() == Some(ERROR_OPERATION_ABORTED as i32) { - // The driver will normally abort the request if the driver state - // is reset. Give the driver service some time to recover before - // retrying. - std::thread::sleep(Duration::from_millis(500)); - } - continue; - } - }; - - Self::handle_event(event_id, event_body, &excluded_processes); - } - - log::debug!("Stopping split tunnel event thread"); - }); - - Ok((event_thread, quit_event_copy)) - } - - fn fetch_next_event( - device: &Arc, - quit_event: &Event, - overlapped: &mut Overlapped, - data_buffer: &mut Vec, - ) -> io::Result { - if unsafe { driver::wait_for_single_object(quit_event.as_raw(), Some(Duration::ZERO)) } - .is_ok() - { - return Ok(EventResult::Quit); - } - - data_buffer.resize(DRIVER_EVENT_BUFFER_SIZE, 0u8); - - unsafe { - driver::device_io_control_buffer_async( - device, - driver::DriverIoctlCode::DequeEvent as u32, - None, - data_buffer.as_mut_ptr(), - u32::try_from(data_buffer.len()).expect("buffer must be smaller than u32"), - overlapped.as_mut_ptr(), - ) - } - .map_err(|error| { - log::error!( - "{}", - error.display_chain_with_msg("DeviceIoControl failed to deque event") - ); - error - })?; - - let event_objects = [ - overlapped.get_event().unwrap().as_raw(), - quit_event.as_raw(), - ]; - - let signaled_object = - unsafe { driver::wait_for_multiple_objects(&event_objects[..], false) }.map_err( - |error| { - log::error!( - "{}", - error.display_chain_with_msg("wait_for_multiple_objects failed") - ); - error - }, - )?; - - if signaled_object == quit_event.as_raw() { - // Quit event was signaled - return Ok(EventResult::Quit); - } - - let returned_bytes = - driver::get_overlapped_result(device, overlapped).map_err(|error| { - if error.raw_os_error() != Some(ERROR_OPERATION_ABORTED as i32) { - log::error!( - "{}", - error.display_chain_with_msg( - "get_overlapped_result failed for dequeued event" - ), - ); - } - error - })?; - - data_buffer - .truncate(usize::try_from(returned_bytes).expect("usize must be no smaller than u32")); - - driver::parse_event_buffer(data_buffer) - .map(|(id, body)| EventResult::Event(id, body)) - .map_err(|error| { - log::error!( - "{}", - error.display_chain_with_msg("Failed to parse ST event buffer") - ); - io::Error::new(io::ErrorKind::Other, "Failed to parse ST event buffer") - }) - } - - fn handle_event( - event_id: driver::EventId, - event_body: driver::EventBody, - excluded_processes: &Arc>>, - ) { - use driver::{EventBody, EventId}; - - let event_str = match &event_id { - EventId::StartSplittingProcess | EventId::ErrorStartSplittingProcess => { - "Start splitting process" - } - EventId::StopSplittingProcess | EventId::ErrorStopSplittingProcess => { - "Stop splitting process" - } - EventId::ErrorMessage => "ErrorMessage", - }; - - match event_body { - EventBody::SplittingEvent { - process_id, - reason, - image, - } => { - let mut pids = excluded_processes.write().unwrap(); - match event_id { - EventId::StartSplittingProcess => { - if let Some(prev_entry) = pids.get(&process_id) { - log::error!("PID collision: {process_id} is already in the list of excluded processes. New image: {:?}. Current image: {:?}", image, prev_entry); - } - pids.insert( - process_id, - ExcludedProcess { - pid: u32::try_from(process_id) - .expect("PID should be containable in a DWORD"), - image: Path::new(&image).to_path_buf(), - inherited: reason - .contains(driver::SplittingChangeReason::BY_INHERITANCE), - }, - ); - } - EventId::StopSplittingProcess => { - if pids.remove(&process_id).is_none() { - log::error!("Inconsistent process tree: {process_id} was not found"); - } - } - _ => (), - } - - log::trace!( - "{}:\n\tpid: {}\n\treason: {:?}\n\timage: {:?}", - event_str, - process_id, - reason, - image, - ); - } - EventBody::SplittingError { process_id, image } => { - log::error!( - "FAILED: {}:\n\tpid: {}\n\timage: {:?}", - event_str, - process_id, - image, - ); - } - EventBody::ErrorMessage { status, message } => { - log::error!("NTSTATUS {:#x}: {}", status, message.to_string_lossy()) - } - } - } - fn spawn_request_thread( resource_dir: PathBuf, daemon_tx: Weak>, From 1532ad396f9925e77177d1897138867f178b109c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Mon, 2 Sep 2024 17:04:49 +0200 Subject: [PATCH 6/7] Move split tunnel request handling to own module --- talpid-core/src/split_tunnel/windows/mod.rs | 319 +---------------- .../src/split_tunnel/windows/request.rs | 334 ++++++++++++++++++ 2 files changed, 341 insertions(+), 312 deletions(-) create mode 100644 talpid-core/src/split_tunnel/windows/request.rs diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs index 70087d3ecd6c..7a368ffb71e3 100644 --- a/talpid-core/src/split_tunnel/windows/mod.rs +++ b/talpid-core/src/split_tunnel/windows/mod.rs @@ -1,20 +1,20 @@ mod driver; mod event; mod path_monitor; +mod request; mod service; mod volume_monitor; mod windows; use crate::{tunnel::TunnelMetadata, tunnel_state_machine::TunnelCommand}; -use driver::DeviceHandle; use futures::channel::{mpsc, oneshot}; -use path_monitor::PathMonitorHandle; +use request::{Request, RequestDetails}; use std::{ collections::HashMap, - ffi::{OsStr, OsString}, + ffi::OsStr, io, net::{IpAddr, Ipv4Addr, Ipv6Addr}, - path::{Path, PathBuf}, + path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, mpsc as sync_mpsc, Arc, Mutex, MutexGuard, RwLock, Weak, @@ -22,7 +22,7 @@ use std::{ time::Duration, }; use talpid_routing::{get_best_default_route, CallbackHandle, EventType, RouteManagerHandle}; -use talpid_types::{split_tunnel::ExcludedProcess, tunnel::ErrorStateCause, ErrorExt}; +use talpid_types::{split_tunnel::ExcludedProcess, ErrorExt}; use talpid_windows::{ net::{get_ip_address_for_interface, AddressFamily}, sync::Event, @@ -89,7 +89,7 @@ pub enum Error { /// Manages applications whose traffic to exclude from the tunnel. pub struct SplitTunnel { runtime: tokio::runtime::Handle, - request_tx: sync_mpsc::Sender, + request_tx: sync_mpsc::Sender, event_thread: Option>, quit_event: Arc, excluded_processes: Arc>>, @@ -98,53 +98,6 @@ pub struct SplitTunnel { route_manager: RouteManagerHandle, } -/// A request to the split tunnel monitor -struct Request { - /// Request details - details: RequestDetails, - /// Whether to block if the request fails - must_succeed: bool, - /// Response channel - response_tx: Option>>, -} - -enum RequestDetails { - /// Update paths to exclude - SetPaths(Vec), - /// Update default and VPN tunnel addresses - RegisterIps(InterfaceAddresses), - /// Stop the split tunnel monitor - Stop, -} - -impl Request { - fn new(details: RequestDetails) -> Self { - Request { - details, - must_succeed: false, - response_tx: None, - } - } - - fn response_tx(mut self, response_tx: sync_mpsc::Sender>) -> Self { - self.response_tx = Some(response_tx); - self - } - - fn must_succeed(mut self) -> Self { - self.must_succeed = true; - self - } - - fn request_name(&self) -> &'static str { - match self.details { - RequestDetails::SetPaths(_) => "SetPaths", - RequestDetails::RegisterIps(_) => "RegisterIps", - RequestDetails::Stop => "Stop", - } - } -} - const REQUEST_TIMEOUT: Duration = Duration::from_secs(5); #[derive(Default, PartialEq, Clone)] @@ -185,7 +138,7 @@ impl SplitTunnel { ) -> Result { let excluded_processes = Arc::new(RwLock::new(HashMap::new())); - let (request_tx, handle) = Self::spawn_request_thread( + let (request_tx, handle) = request::spawn_request_thread( resource_dir, daemon_tx, volume_update_rx, @@ -207,264 +160,6 @@ impl SplitTunnel { }) } - fn spawn_request_thread( - resource_dir: PathBuf, - daemon_tx: Weak>, - volume_update_rx: mpsc::UnboundedReceiver<()>, - excluded_processes: Arc>>, - ) -> Result<(sync_mpsc::Sender, Arc), Error> { - let (tx, rx): (sync_mpsc::Sender, _) = sync_mpsc::channel(); - let (init_tx, init_rx) = sync_mpsc::channel(); - - let monitored_paths = Arc::new(Mutex::new(vec![])); - let monitored_paths_copy = monitored_paths.clone(); - - let (monitor_tx, monitor_rx) = sync_mpsc::channel(); - - let path_monitor = path_monitor::PathMonitor::spawn(monitor_tx.clone()) - .map_err(Error::StartPathMonitor)?; - let volume_monitor = volume_monitor::VolumeMonitor::spawn( - path_monitor.clone(), - monitor_tx, - monitored_paths.clone(), - volume_update_rx, - ); - - std::thread::spawn(move || { - // Ensure that the device driver service is running and that we have a handle to it - let handle = match Self::setup_and_create_device(&resource_dir) { - Ok(handle) => { - let _ = init_tx.send(Ok(handle.clone())); - handle - } - Err(error) => { - let _ = init_tx.send(Err(error)); - return; - } - }; - - Self::request_loop( - handle.clone(), - rx, - daemon_tx, - monitored_paths, - path_monitor.clone(), - excluded_processes, - ); - - // Shut components down in a sane order - drop(volume_monitor); - if let Err(error) = path_monitor.shutdown() { - log::error!( - "{}", - error.display_chain_with_msg("Failed to shut down path monitor") - ); - } - - // The device handle must be dropped before stopping the service - debug_assert_eq!(Arc::strong_count(&handle), 1); - drop(handle); - - log::debug!("Stopping ST service"); - if let Err(error) = service::stop_driver_service() { - log::error!( - "{}", - error.display_chain_with_msg("Failed to stop ST service") - ); - } - }); - - let handle = init_rx - .recv_timeout(REQUEST_TIMEOUT) - .map_err(|_| Error::RequestThreadStuck)??; - - let handle_copy = handle.clone(); - - std::thread::spawn(move || { - while let Ok(()) = monitor_rx.recv() { - let paths = monitored_paths_copy.lock().unwrap(); - let result = if paths.len() > 0 { - log::debug!("Re-resolving excluded paths"); - handle_copy.set_config(&paths) - } else { - continue; - }; - if let Err(error) = result { - log::error!( - "{}", - error.display_chain_with_msg("Failed to update excluded paths") - ); - } - } - }); - - Ok((tx, handle)) - } - - /// Install the driver and create a device for it - fn setup_and_create_device(resource_dir: &Path) -> Result, Error> { - service::install_driver_if_required(resource_dir).map_err(Error::ServiceError)?; - driver::DeviceHandle::new() - .map(Arc::new) - .map_err(Error::InitializationError) - } - - /// Service requests to the device driver - fn request_loop( - handle: Arc, - cmd_rx: sync_mpsc::Receiver, - daemon_tx: Weak>, - monitored_paths: Arc>>, - path_monitor: PathMonitorHandle, - excluded_processes: Arc>>, - ) { - let mut previous_addresses = InterfaceAddresses::default(); - - while let Ok(request) = cmd_rx.recv() { - let request_name = request.request_name(); - - let (should_stop, response) = Self::handle_request( - request.details, - &handle, - &path_monitor, - &monitored_paths, - &excluded_processes, - &mut previous_addresses, - ); - - Self::handle_request_result( - &daemon_tx, - response, - request.must_succeed, - request_name, - request.response_tx, - ); - - // Stop request loop - if should_stop { - break; - } - } - } - - /// Handle a request to the split tunnel device - fn handle_request( - request: RequestDetails, - handle: &DeviceHandle, - path_monitor: &path_monitor::PathMonitorHandle, - monitored_paths: &Arc>>, - excluded_processes: &Arc>>, - previous_addresses: &mut InterfaceAddresses, - ) -> (bool, Result<(), Error>) { - let (should_stop, result) = match request { - RequestDetails::SetPaths(paths) => { - let mut monitored_paths_guard = monitored_paths.lock().unwrap(); - - let result = if !paths.is_empty() { - handle.set_config(&paths).map_err(Error::SetConfiguration) - } else { - handle.clear_config().map_err(Error::SetConfiguration) - }; - - if result.is_ok() { - if let Err(error) = path_monitor.set_paths(&paths) { - log::error!( - "{}", - error.display_chain_with_msg("Failed to update path monitor") - ); - } - *monitored_paths_guard = paths.to_vec(); - } - - (false, result) - } - RequestDetails::RegisterIps(mut ips) => { - if ips.internet_ipv4.is_none() && ips.internet_ipv6.is_none() { - ips.tunnel_ipv4 = None; - ips.tunnel_ipv6 = None; - } - if previous_addresses == &ips { - (false, Ok(())) - } else { - let result = handle - .register_ips( - ips.tunnel_ipv4, - ips.tunnel_ipv6, - ips.internet_ipv4, - ips.internet_ipv6, - ) - .map_err(Error::RegisterIps); - if result.is_ok() { - *previous_addresses = ips; - } - (false, result) - } - } - RequestDetails::Stop => { - if let Err(error) = handle.reset().map_err(Error::ResetError) { - // Shut down failed, so continue to live - return (false, Err(error)); - } - - // Clean up - monitored_paths.lock().unwrap().clear(); - excluded_processes.write().unwrap().clear(); - - // Stop listening to commands - (true, Ok(())) - } - }; - - (should_stop, result) - } - - /// Handle the result of a request - fn handle_request_result( - daemon_tx: &Weak>, - result: Result<(), Error>, - must_succeed: bool, - request_name: &str, - response_tx: Option>>, - ) { - let log_request_failure = |response: &Result<(), Error>| { - if let Err(error) = response { - log::error!( - "Request/ioctl failed: {}\n{}", - request_name, - error.display_chain() - ); - } - }; - - let request_failed = result.is_err(); - - if let Some(response_tx) = response_tx { - if let Err(error) = response_tx.send(result) { - log::error!( - "A response could not be sent for completed request/ioctl: {}", - request_name - ); - log_request_failure(&error.0); - } - } else { - log_request_failure(&result); - } - - // Move to error state if the request failed but must succeed - if request_failed && must_succeed { - if let Some(daemon_tx) = daemon_tx.upgrade() { - log::debug!( - "Entering error state due to failed request/ioctl: {}", - request_name - ); - let _ = daemon_tx - .unbounded_send(TunnelCommand::Block(ErrorStateCause::SplitTunnelError)); - } else { - log::error!("Cannot handle failed request since tunnel state machine is down"); - } - } - } - fn send_request(&self, request: RequestDetails) -> Result<(), Error> { Self::send_request_inner(&self.request_tx, request) } diff --git a/talpid-core/src/split_tunnel/windows/request.rs b/talpid-core/src/split_tunnel/windows/request.rs new file mode 100644 index 000000000000..4dd7c1feb7d2 --- /dev/null +++ b/talpid-core/src/split_tunnel/windows/request.rs @@ -0,0 +1,334 @@ +//! This module spawns a thread used to service request to the split tunnel device driver. +//! +//! We've chosen isolate all dealings with the device driver on a dedicated thread as we've +//! previously faced issues with other software fighting us over the global transaction lock in WFP +//! (Windows Filtering Platform). +//! +//! This design also makes the tunnel state machine relatively protected against driver failure. + +use crate::tunnel_state_machine::TunnelCommand; +use futures::channel::mpsc; +use std::{ + collections::HashMap, + ffi::OsString, + path::{Path, PathBuf}, + sync::{mpsc as sync_mpsc, Arc, Mutex, RwLock, Weak}, + time::Duration, +}; +use talpid_types::{split_tunnel::ExcludedProcess, tunnel::ErrorStateCause, ErrorExt}; + +use super::{ + driver::DeviceHandle, + path_monitor::{PathMonitor, PathMonitorHandle}, + service, + volume_monitor::VolumeMonitor, + Error, InterfaceAddresses, +}; + +const INIT_TIMEOUT: Duration = Duration::from_secs(5); + +/// A request to the split tunnel monitor +pub struct Request { + /// Request details + details: RequestDetails, + /// Whether to block if the request fails + must_succeed: bool, + /// Response channel + response_tx: Option>>, +} + +/// The particular request to send +pub enum RequestDetails { + /// Update paths to exclude + SetPaths(Vec), + /// Update default and VPN tunnel addresses + RegisterIps(InterfaceAddresses), + /// Stop the split tunnel monitor + Stop, +} + +impl Request { + pub fn new(details: RequestDetails) -> Self { + Request { + details, + must_succeed: false, + response_tx: None, + } + } + + pub fn response_tx(mut self, response_tx: sync_mpsc::Sender>) -> Self { + self.response_tx = Some(response_tx); + self + } + + pub fn must_succeed(mut self) -> Self { + self.must_succeed = true; + self + } + + pub fn request_name(&self) -> &'static str { + match self.details { + RequestDetails::SetPaths(_) => "SetPaths", + RequestDetails::RegisterIps(_) => "RegisterIps", + RequestDetails::Stop => "Stop", + } + } +} + +/// Begin servicing requests sent on the returned channel +pub fn spawn_request_thread( + resource_dir: PathBuf, + daemon_tx: Weak>, + volume_update_rx: mpsc::UnboundedReceiver<()>, + excluded_processes: Arc>>, +) -> Result<(sync_mpsc::Sender, Arc), Error> { + let (tx, rx): (sync_mpsc::Sender, _) = sync_mpsc::channel(); + let (init_tx, init_rx) = sync_mpsc::channel(); + + let monitored_paths = Arc::new(Mutex::new(vec![])); + let monitored_paths_copy = monitored_paths.clone(); + + let (monitor_tx, monitor_rx) = sync_mpsc::channel(); + + let path_monitor = PathMonitor::spawn(monitor_tx.clone()).map_err(Error::StartPathMonitor)?; + let volume_monitor = VolumeMonitor::spawn( + path_monitor.clone(), + monitor_tx, + monitored_paths.clone(), + volume_update_rx, + ); + + std::thread::spawn(move || { + // Ensure that the device driver service is running and that we have a handle to it + let handle = match setup_and_create_device(&resource_dir) { + Ok(handle) => { + let _ = init_tx.send(Ok(handle.clone())); + handle + } + Err(error) => { + let _ = init_tx.send(Err(error)); + return; + } + }; + + request_loop( + handle.clone(), + rx, + daemon_tx, + monitored_paths, + path_monitor.clone(), + excluded_processes, + ); + + // Shut components down in a sane order + drop(volume_monitor); + if let Err(error) = path_monitor.shutdown() { + log::error!( + "{}", + error.display_chain_with_msg("Failed to shut down path monitor") + ); + } + + // The device handle must be dropped before stopping the service + debug_assert_eq!(Arc::strong_count(&handle), 1); + drop(handle); + + log::debug!("Stopping ST service"); + if let Err(error) = service::stop_driver_service() { + log::error!( + "{}", + error.display_chain_with_msg("Failed to stop ST service") + ); + } + }); + + let handle = init_rx + .recv_timeout(INIT_TIMEOUT) + .map_err(|_| Error::RequestThreadStuck)??; + + let handle_copy = handle.clone(); + + std::thread::spawn(move || { + while let Ok(()) = monitor_rx.recv() { + let paths = monitored_paths_copy.lock().unwrap(); + let result = if paths.len() > 0 { + log::debug!("Re-resolving excluded paths"); + handle_copy.set_config(&paths) + } else { + continue; + }; + if let Err(error) = result { + log::error!( + "{}", + error.display_chain_with_msg("Failed to update excluded paths") + ); + } + } + }); + + Ok((tx, handle)) +} + +/// Install the driver and open a handle for it +fn setup_and_create_device(resource_dir: &Path) -> Result, Error> { + service::install_driver_if_required(resource_dir).map_err(Error::ServiceError)?; + DeviceHandle::new() + .map(Arc::new) + .map_err(Error::InitializationError) +} + +/// Service requests to the device driver +fn request_loop( + handle: Arc, + cmd_rx: sync_mpsc::Receiver, + daemon_tx: Weak>, + monitored_paths: Arc>>, + path_monitor: PathMonitorHandle, + excluded_processes: Arc>>, +) { + let mut previous_addresses = InterfaceAddresses::default(); + + while let Ok(request) = cmd_rx.recv() { + let request_name = request.request_name(); + + let (should_stop, response) = handle_request( + request.details, + &handle, + &path_monitor, + &monitored_paths, + &excluded_processes, + &mut previous_addresses, + ); + + handle_request_result( + &daemon_tx, + response, + request.must_succeed, + request_name, + request.response_tx, + ); + + // Stop request loop + if should_stop { + break; + } + } +} + +/// Handle a request to the split tunnel device +fn handle_request( + request: RequestDetails, + handle: &DeviceHandle, + path_monitor: &PathMonitorHandle, + monitored_paths: &Arc>>, + excluded_processes: &Arc>>, + previous_addresses: &mut InterfaceAddresses, +) -> (bool, Result<(), Error>) { + let (should_stop, result) = match request { + RequestDetails::SetPaths(paths) => { + let mut monitored_paths_guard = monitored_paths.lock().unwrap(); + + let result = if !paths.is_empty() { + handle.set_config(&paths).map_err(Error::SetConfiguration) + } else { + handle.clear_config().map_err(Error::SetConfiguration) + }; + + if result.is_ok() { + if let Err(error) = path_monitor.set_paths(&paths) { + log::error!( + "{}", + error.display_chain_with_msg("Failed to update path monitor") + ); + } + *monitored_paths_guard = paths.to_vec(); + } + + (false, result) + } + RequestDetails::RegisterIps(mut ips) => { + if ips.internet_ipv4.is_none() && ips.internet_ipv6.is_none() { + ips.tunnel_ipv4 = None; + ips.tunnel_ipv6 = None; + } + if previous_addresses == &ips { + (false, Ok(())) + } else { + let result = handle + .register_ips( + ips.tunnel_ipv4, + ips.tunnel_ipv6, + ips.internet_ipv4, + ips.internet_ipv6, + ) + .map_err(Error::RegisterIps); + if result.is_ok() { + *previous_addresses = ips; + } + (false, result) + } + } + RequestDetails::Stop => { + if let Err(error) = handle.reset().map_err(Error::ResetError) { + // Shut down failed, so continue to live + return (false, Err(error)); + } + + // Clean up + monitored_paths.lock().unwrap().clear(); + excluded_processes.write().unwrap().clear(); + + // Stop listening to commands + (true, Ok(())) + } + }; + + (should_stop, result) +} + +/// Handle the result of a request +fn handle_request_result( + daemon_tx: &Weak>, + result: Result<(), Error>, + must_succeed: bool, + request_name: &str, + response_tx: Option>>, +) { + let log_request_failure = |response: &Result<(), Error>| { + if let Err(error) = response { + log::error!( + "Request/ioctl failed: {}\n{}", + request_name, + error.display_chain() + ); + } + }; + + let request_failed = result.is_err(); + + if let Some(response_tx) = response_tx { + if let Err(error) = response_tx.send(result) { + log::error!( + "A response could not be sent for completed request/ioctl: {}", + request_name + ); + log_request_failure(&error.0); + } + } else { + log_request_failure(&result); + } + + // Move to error state if the request failed but must succeed + if request_failed && must_succeed { + if let Some(daemon_tx) = daemon_tx.upgrade() { + log::debug!( + "Entering error state due to failed request/ioctl: {}", + request_name + ); + let _ = + daemon_tx.unbounded_send(TunnelCommand::Block(ErrorStateCause::SplitTunnelError)); + } else { + log::error!("Cannot handle failed request since tunnel state machine is down"); + } + } +} From 6358c5f180476d2b6d798426077052f513972532 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Mon, 2 Sep 2024 17:29:12 +0200 Subject: [PATCH 7/7] Move volume and path monitor out of request handling code --- talpid-core/src/split_tunnel/windows/mod.rs | 50 ++++++++++++++++++- .../src/split_tunnel/windows/request.rs | 44 ++-------------- 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/talpid-core/src/split_tunnel/windows/mod.rs b/talpid-core/src/split_tunnel/windows/mod.rs index 7a368ffb71e3..4194edb4b694 100644 --- a/talpid-core/src/split_tunnel/windows/mod.rs +++ b/talpid-core/src/split_tunnel/windows/mod.rs @@ -7,11 +7,13 @@ mod volume_monitor; mod windows; use crate::{tunnel::TunnelMetadata, tunnel_state_machine::TunnelCommand}; +use driver::DeviceHandle; use futures::channel::{mpsc, oneshot}; +use path_monitor::PathMonitor; use request::{Request, RequestDetails}; use std::{ collections::HashMap, - ffi::OsStr, + ffi::{OsStr, OsString}, io, net::{IpAddr, Ipv4Addr, Ipv6Addr}, path::PathBuf, @@ -27,6 +29,7 @@ use talpid_windows::{ net::{get_ip_address_for_interface, AddressFamily}, sync::Event, }; +use volume_monitor::VolumeMonitor; const RESERVED_IP_V4: Ipv4Addr = Ipv4Addr::new(192, 0, 2, 123); @@ -138,13 +141,38 @@ impl SplitTunnel { ) -> Result { let excluded_processes = Arc::new(RwLock::new(HashMap::new())); + let (refresh_paths_tx, refresh_paths_rx) = sync_mpsc::channel(); + + let path_monitor = + PathMonitor::spawn(refresh_paths_tx.clone()).map_err(Error::StartPathMonitor)?; + + let monitored_paths = Arc::new(Mutex::new(vec![])); + let volume_monitor = VolumeMonitor::spawn( + path_monitor.clone(), + refresh_paths_tx, + monitored_paths.clone(), + volume_update_rx, + ); + let (request_tx, handle) = request::spawn_request_thread( resource_dir, daemon_tx, - volume_update_rx, + path_monitor, + volume_monitor, + monitored_paths.clone(), excluded_processes.clone(), )?; + let handle_copy = Arc::downgrade(&handle); + std::thread::spawn(move || { + while let Ok(()) = refresh_paths_rx.recv() { + let Some(handle) = handle_copy.upgrade() else { + return; + }; + Self::handle_volume_monitor_update(&handle, &monitored_paths); + } + }); + let (event_thread, quit_event) = event::spawn_listener(handle, excluded_processes.clone()) .map_err(Error::EventThreadError)?; @@ -160,6 +188,24 @@ impl SplitTunnel { }) } + fn handle_volume_monitor_update( + handle: &DeviceHandle, + monitored_paths: &Arc>>, + ) { + let paths = monitored_paths.lock().unwrap(); + if paths.len() == 0 { + return; + } + + log::debug!("Re-resolving excluded paths"); + if let Err(error) = handle.set_config(&paths) { + log::error!( + "{}", + error.display_chain_with_msg("Failed to update excluded paths") + ); + } + } + fn send_request(&self, request: RequestDetails) -> Result<(), Error> { Self::send_request_inner(&self.request_tx, request) } diff --git a/talpid-core/src/split_tunnel/windows/request.rs b/talpid-core/src/split_tunnel/windows/request.rs index 4dd7c1feb7d2..bd80d0c64736 100644 --- a/talpid-core/src/split_tunnel/windows/request.rs +++ b/talpid-core/src/split_tunnel/windows/request.rs @@ -18,11 +18,8 @@ use std::{ use talpid_types::{split_tunnel::ExcludedProcess, tunnel::ErrorStateCause, ErrorExt}; use super::{ - driver::DeviceHandle, - path_monitor::{PathMonitor, PathMonitorHandle}, - service, - volume_monitor::VolumeMonitor, - Error, InterfaceAddresses, + driver::DeviceHandle, path_monitor::PathMonitorHandle, service, + volume_monitor::VolumeMonitorHandle, Error, InterfaceAddresses, }; const INIT_TIMEOUT: Duration = Duration::from_secs(5); @@ -79,25 +76,14 @@ impl Request { pub fn spawn_request_thread( resource_dir: PathBuf, daemon_tx: Weak>, - volume_update_rx: mpsc::UnboundedReceiver<()>, + path_monitor: PathMonitorHandle, + volume_monitor: VolumeMonitorHandle, + monitored_paths: Arc>>, excluded_processes: Arc>>, ) -> Result<(sync_mpsc::Sender, Arc), Error> { let (tx, rx): (sync_mpsc::Sender, _) = sync_mpsc::channel(); let (init_tx, init_rx) = sync_mpsc::channel(); - let monitored_paths = Arc::new(Mutex::new(vec![])); - let monitored_paths_copy = monitored_paths.clone(); - - let (monitor_tx, monitor_rx) = sync_mpsc::channel(); - - let path_monitor = PathMonitor::spawn(monitor_tx.clone()).map_err(Error::StartPathMonitor)?; - let volume_monitor = VolumeMonitor::spawn( - path_monitor.clone(), - monitor_tx, - monitored_paths.clone(), - volume_update_rx, - ); - std::thread::spawn(move || { // Ensure that the device driver service is running and that we have a handle to it let handle = match setup_and_create_device(&resource_dir) { @@ -146,26 +132,6 @@ pub fn spawn_request_thread( .recv_timeout(INIT_TIMEOUT) .map_err(|_| Error::RequestThreadStuck)??; - let handle_copy = handle.clone(); - - std::thread::spawn(move || { - while let Ok(()) = monitor_rx.recv() { - let paths = monitored_paths_copy.lock().unwrap(); - let result = if paths.len() > 0 { - log::debug!("Re-resolving excluded paths"); - handle_copy.set_config(&paths) - } else { - continue; - }; - if let Err(error) = result { - log::error!( - "{}", - error.display_chain_with_msg("Failed to update excluded paths") - ); - } - } - }); - Ok((tx, handle)) }