diff --git a/rtc-ice/src/agent/agent_selector.rs b/rtc-ice/src/agent/agent_selector.rs index fa659e5..966d339 100644 --- a/rtc-ice/src/agent/agent_selector.rs +++ b/rtc-ice/src/agent/agent_selector.rs @@ -115,6 +115,52 @@ impl Agent { } } + pub(crate) fn get_selected_pair(&self) -> Option { + self.selected_pair + } + + pub(crate) fn get_best_available_candidate_pair(&self) -> Option { + let mut best: Option = None; + + for (index, p) in self.checklist.iter().enumerate() { + if p.state == CandidatePairState::Failed { + continue; + } + + if let Some(best_index) = &mut best { + let b = &self.checklist[*best_index]; + if b.priority() < p.priority() { + *best_index = index; + } + } else { + best = Some(index); + } + } + + best + } + + pub(crate) fn get_best_valid_candidate_pair(&self) -> Option { + let mut best: Option = None; + + for (index, p) in self.checklist.iter().enumerate() { + if p.state != CandidatePairState::Succeeded { + continue; + } + + if let Some(best_index) = &mut best { + let b = &self.checklist[*best_index]; + if b.priority() < p.priority() { + *best_index = index; + } + } else { + best = Some(index); + } + } + + best + } + pub(crate) fn start(&mut self) { if self.is_controlling { ControllingSelector::start(self); @@ -177,7 +223,7 @@ impl ControllingSelector for Agent { let nominated_pair_is_some = self.nominated_pair.is_some(); - if self.agent_conn.get_selected_pair().is_some() { + if self.get_selected_pair().is_some() { if self.validate_selected_pair() { log::trace!("[{}]: checking keepalive", self.get_name()); self.check_keepalive(); @@ -185,17 +231,16 @@ impl ControllingSelector for Agent { } else if nominated_pair_is_some { self.nominate_pair(); } else { - let has_nominated_pair = - if let Some(index) = self.agent_conn.get_best_valid_candidate_pair() { - let p = self.agent_conn.checklist[index]; - self.is_nominatable(p.local, true) && self.is_nominatable(p.remote, false) - } else { - false - }; + let has_nominated_pair = if let Some(index) = self.get_best_valid_candidate_pair() { + let p = self.checklist[index]; + self.is_nominatable(p.local, true) && self.is_nominatable(p.remote, false) + } else { + false + }; if has_nominated_pair { - if let Some(index) = self.agent_conn.get_best_valid_candidate_pair() { - let p = &mut self.agent_conn.checklist[index]; + if let Some(index) = self.get_best_valid_candidate_pair() { + let p = &mut self.checklist[index]; log::trace!( "Nominatable pair found, nominating ({}, {})", self.local_candidates[p.local], @@ -260,10 +305,10 @@ impl ControllingSelector for Agent { remote, local ); - let selected_pair_is_none = self.agent_conn.get_selected_pair().is_none(); + let selected_pair_is_none = self.get_selected_pair().is_none(); if let Some(index) = self.find_pair(local, remote) { - let p = &mut self.agent_conn.checklist[index]; + let p = &mut self.checklist[index]; p.state = CandidatePairState::Succeeded; log::trace!( "Found valid candidate pair: {}, p.state: {}, isUseCandidate: {}, {}", @@ -293,7 +338,7 @@ impl ControllingSelector for Agent { log::trace!("controllingSelector: sendBindingSuccess"); if let Some(index) = self.find_pair(local, remote) { - let p = &self.agent_conn.checklist[index]; + let p = &self.checklist[index]; let nominated_pair_is_none = self.nominated_pair.is_none(); log::trace!( @@ -301,13 +346,13 @@ impl ControllingSelector for Agent { p, p.state, nominated_pair_is_none, - //self.agent_conn.get_selected_pair().await.is_none() //, {} + //self.get_selected_pair().await.is_none() //, {} ); if p.state == CandidatePairState::Succeeded && nominated_pair_is_none - && self.agent_conn.get_selected_pair().is_none() + && self.get_selected_pair().is_none() { - if let Some(best_pair) = self.agent_conn.get_best_available_candidate_pair() { + if let Some(best_pair) = self.get_best_available_candidate_pair() { log::trace!( "controllingSelector: getBestAvailableCandidatePair {}", best_pair @@ -339,7 +384,7 @@ impl ControlledSelector for Agent { // A lite selector should not contact candidates if self.lite { self.validate_selected_pair(); - } else if self.agent_conn.get_selected_pair().is_some() { + } else if self.get_selected_pair().is_some() { if self.validate_selected_pair() { log::trace!("[{}]: checking keepalive", self.get_name()); self.check_keepalive(); @@ -405,7 +450,7 @@ impl ControlledSelector for Agent { ); if let Some(index) = self.find_pair(local, remote) { - let p = &mut self.agent_conn.checklist[index]; + let p = &mut self.checklist[index]; p.state = CandidatePairState::Succeeded; log::trace!("Found valid candidate pair: {}", *p); } else { @@ -427,7 +472,7 @@ impl ControlledSelector for Agent { } if let Some(index) = self.find_pair(local, remote) { - let p = &self.agent_conn.checklist[index]; + let p = &self.checklist[index]; let use_candidate = m.contains(ATTR_USE_CANDIDATE); if use_candidate { // https://tools.ietf.org/html/rfc8445#section-7.3.1.5 @@ -437,7 +482,7 @@ impl ControlledSelector for Agent { // previously sent by this pair produced a successful response and // generated a valid pair (Section 7.2.5.3.2). The agent sets the // nominated flag value of the valid pair to true. - if self.agent_conn.get_selected_pair().is_none() { + if self.get_selected_pair().is_none() { self.set_selected_pair(Some(index)); } self.send_binding_success(m, local, remote); diff --git a/rtc-ice/src/agent/agent_stats.rs b/rtc-ice/src/agent/agent_stats.rs index df1066d..3d2a979 100644 --- a/rtc-ice/src/agent/agent_stats.rs +++ b/rtc-ice/src/agent/agent_stats.rs @@ -201,9 +201,8 @@ impl Default for CandidateStats { impl Agent { /// Returns a list of candidate pair stats. pub fn get_candidate_pairs_stats(&self) -> Vec { - let checklist = &self.agent_conn.checklist; - let mut res = Vec::with_capacity(checklist.len()); - for cp in checklist { + let mut res = Vec::with_capacity(self.checklist.len()); + for cp in &self.checklist { let stat = CandidatePairStats { timestamp: Instant::now(), local_candidate_id: self.local_candidates[cp.local].id(), diff --git a/rtc-ice/src/agent/agent_transport.rs b/rtc-ice/src/agent/agent_transport.rs deleted file mode 100644 index cf5064f..0000000 --- a/rtc-ice/src/agent/agent_transport.rs +++ /dev/null @@ -1,211 +0,0 @@ -use super::*; -use crate::candidate::candidate_pair::{CandidatePair, CandidatePairState}; - -impl Agent { - /// Connects to the remote agent, acting as the controlling ice agent. - pub fn connect( - &mut self, - //mut cancel_rx: mpsc::Receiver<()>, - remote_ufrag: String, - remote_pwd: String, - ) -> Result<() /*Arc*/> { - //let (on_connected_rx, agent_conn) = { - self.start_connectivity_checks(true, remote_ufrag, remote_pwd)?; - - /*let mut on_connected_rx = self.internal.on_connected_rx.lock().await; - ( - on_connected_rx.take(), - Arc::clone(&self.internal.agent_conn), - )*/ - //}; - /* - if let Some(mut on_connected_rx) = on_connected_rx { - // block until pair selected - tokio::select! { - _ = on_connected_rx.recv() => {}, - _ = cancel_rx.recv() => { - return Err(Error::ErrCanceledByCaller); - } - } - } - Ok(agent_conn)*/ - Ok(()) - } - - /// Connects to the remote agent, acting as the controlled ice agent. - pub fn accept( - &mut self, - //mut cancel_rx: mpsc::Receiver<()>, - remote_ufrag: String, - remote_pwd: String, - ) -> Result<() /*Arc*/> { - //let (on_connected_rx, agent_conn) = { - self.start_connectivity_checks(false, remote_ufrag, remote_pwd)?; - - /* let mut on_connected_rx = self.internal.on_connected_rx.lock().await; - ( - on_connected_rx.take(), - Arc::clone(&self.internal.agent_conn), - ) - }; - - if let Some(mut on_connected_rx) = on_connected_rx { - // block until pair selected - tokio::select! { - _ = on_connected_rx.recv() => {}, - _ = cancel_rx.recv() => { - return Err(Error::ErrCanceledByCaller); - } - } - } - - Ok(agent_conn)*/ - Ok(()) - } -} - -pub(crate) struct AgentConn { - pub(crate) selected_pair: Option, - pub(crate) checklist: Vec, - pub(crate) done: bool, -} - -impl AgentConn { - pub(crate) fn new() -> Self { - Self { - selected_pair: None, - checklist: vec![], - done: false, - } - } - pub(crate) fn get_selected_pair(&self) -> Option { - self.selected_pair - } - - pub(crate) fn get_best_available_candidate_pair(&self) -> Option { - let mut best: Option = None; - - for (index, p) in self.checklist.iter().enumerate() { - if p.state == CandidatePairState::Failed { - continue; - } - - if let Some(best_index) = &mut best { - let b = &self.checklist[*best_index]; - if b.priority() < p.priority() { - *best_index = index; - } - } else { - best = Some(index); - } - } - - best - } - - pub(crate) fn get_best_valid_candidate_pair(&self) -> Option { - let mut best: Option = None; - - for (index, p) in self.checklist.iter().enumerate() { - if p.state != CandidatePairState::Succeeded { - continue; - } - - if let Some(best_index) = &mut best { - let b = &self.checklist[*best_index]; - if b.priority() < p.priority() { - *best_index = index; - } - } else { - best = Some(index); - } - } - - best - } -} - -/* -#[async_trait] -impl Conn for AgentConn { - async fn connect(&self, _addr: SocketAddr) -> std::result::Result<(), util::Error> { - Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into()) - } - - async fn recv(&self, buf: &mut [u8]) -> std::result::Result { - if self.done.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, "Conn is closed").into()); - } - - let n = match self.buffer.read(buf, None).await { - Ok(n) => n, - Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err.to_string()).into()), - }; - self.bytes_received.fetch_add(n, Ordering::SeqCst); - - Ok(n) - } - - async fn recv_from( - &self, - buf: &mut [u8], - ) -> std::result::Result<(usize, SocketAddr), util::Error> { - if let Some(raddr) = self.remote_addr() { - let n = self.recv(buf).await?; - Ok((n, raddr)) - } else { - Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into()) - } - } - - async fn send(&self, buf: &[u8]) -> std::result::Result { - if self.done.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, "Conn is closed").into()); - } - - if is_message(buf) { - return Err(util::Error::Other("ErrIceWriteStunMessage".into())); - } - - let result = if let Some(pair) = self.get_selected_pair() { - pair.write(buf).await - } else if let Some(pair) = self.get_best_available_candidate_pair().await { - pair.write(buf).await - } else { - Ok(0) - }; - - match result { - Ok(n) => { - self.bytes_sent.fetch_add(buf.len(), Ordering::SeqCst); - Ok(n) - } - Err(err) => Err(io::Error::new(io::ErrorKind::Other, err.to_string()).into()), - } - } - - async fn send_to( - &self, - _buf: &[u8], - _target: SocketAddr, - ) -> std::result::Result { - Err(io::Error::new(io::ErrorKind::Other, "Not applicable").into()) - } - - fn local_addr(&self) -> std::result::Result { - if let Some(pair) = self.get_selected_pair() { - Ok(pair.local.addr()) - } else { - Err(io::Error::new(io::ErrorKind::AddrNotAvailable, "Addr Not Available").into()) - } - } - - fn remote_addr(&self) -> Option { - self.get_selected_pair().map(|pair| pair.remote.addr()) - } - - async fn close(&self) -> std::result::Result<(), util::Error> { - Ok(()) - } -} -*/ diff --git a/rtc-ice/src/agent/agent_transport_test.rs b/rtc-ice/src/agent/agent_transport_test.rs deleted file mode 100644 index 8d4a801..0000000 --- a/rtc-ice/src/agent/agent_transport_test.rs +++ /dev/null @@ -1,133 +0,0 @@ -use util::vnet::*; -use util::Conn; -use waitgroup::WaitGroup; - -use super::agent_vnet_test::*; -use super::*; -use crate::agent::agent_transport::AgentConn; - -pub(crate) async fn pipe( - default_config0: Option, - default_config1: Option, -) -> Result<(Arc, Arc, Arc, Arc)> { - let (a_notifier, mut a_connected) = on_connected(); - let (b_notifier, mut b_connected) = on_connected(); - - let mut cfg0 = if let Some(cfg) = default_config0 { - cfg - } else { - AgentConfig::default() - }; - cfg0.urls = vec![]; - cfg0.network_types = supported_network_types(); - - let a_agent = Arc::new(Agent::new(cfg0).await?); - a_agent.on_connection_state_change(a_notifier); - - let mut cfg1 = if let Some(cfg) = default_config1 { - cfg - } else { - AgentConfig::default() - }; - cfg1.urls = vec![]; - cfg1.network_types = supported_network_types(); - - let b_agent = Arc::new(Agent::new(cfg1).await?); - b_agent.on_connection_state_change(b_notifier); - - let (a_conn, b_conn) = connect_with_vnet(&a_agent, &b_agent).await?; - - // Ensure pair selected - // Note: this assumes ConnectionStateConnected is thrown after selecting the final pair - let _ = a_connected.recv().await; - let _ = b_connected.recv().await; - - Ok((a_conn, b_conn, a_agent, b_agent)) -} - -#[tokio::test] -async fn test_remote_local_addr() -> Result<()> { - // Agent0 is behind 1:1 NAT - let nat_type0 = nat::NatType { - mode: nat::NatMode::Nat1To1, - ..Default::default() - }; - // Agent1 is behind 1:1 NAT - let nat_type1 = nat::NatType { - mode: nat::NatMode::Nat1To1, - ..Default::default() - }; - - let v = build_vnet(nat_type0, nat_type1).await?; - - let stun_server_url = Url { - scheme: SchemeType::Stun, - host: VNET_STUN_SERVER_IP.to_owned(), - port: VNET_STUN_SERVER_PORT, - proto: ProtoType::Udp, - ..Default::default() - }; - - //"Disconnected Returns nil" - { - let disconnected_conn = AgentConn::new(); - let result = disconnected_conn.local_addr(); - assert!(result.is_err(), "Disconnected Returns nil"); - } - - //"Remote/Local Pair Match between Agents" - { - let (ca, cb) = pipe_with_vnet( - &v, - AgentTestConfig { - urls: vec![stun_server_url.clone()], - ..Default::default() - }, - AgentTestConfig { - urls: vec![stun_server_url], - ..Default::default() - }, - ) - .await?; - - let a_laddr = ca.local_addr()?; - let b_laddr = cb.local_addr()?; - - // Assert addresses - assert_eq!(a_laddr.ip().to_string(), VNET_LOCAL_IPA.to_string()); - assert_eq!(b_laddr.ip().to_string(), VNET_LOCAL_IPB.to_string()); - - // Close - //ca.close().await?; - //cb.close().await?; - } - - v.close().await?; - - Ok(()) -} - -#[tokio::test] -async fn test_conn_stats() -> Result<()> { - let (ca, cb, _, _) = pipe(None, None).await?; - let na = ca.send(&[0u8; 10]).await?; - - let wg = WaitGroup::new(); - - let w = wg.worker(); - tokio::spawn(async move { - let _d = w; - - let mut buf = vec![0u8; 10]; - let nb = cb.recv(&mut buf).await?; - assert_eq!(nb, 10, "bytes received don't match"); - - Result::<()>::Ok(()) - }); - - wg.wait().await; - - assert_eq!(na, 10, "bytes sent don't match"); - - Ok(()) -} diff --git a/rtc-ice/src/agent/mod.rs b/rtc-ice/src/agent/mod.rs index 54e9a4d..47944cd 100644 --- a/rtc-ice/src/agent/mod.rs +++ b/rtc-ice/src/agent/mod.rs @@ -6,7 +6,6 @@ pub mod agent_config; pub mod agent_selector; pub mod agent_stats; -pub mod agent_transport; use agent_config::*; use std::net::{Ipv4Addr, SocketAddr}; @@ -18,7 +17,6 @@ use stun::message::*; use stun::textattrs::Username; use stun::xoraddr::*; -use crate::agent::agent_transport::*; use crate::candidate::candidate_pair::{CandidatePair, CandidatePairState}; use crate::candidate::*; use crate::rand::*; @@ -81,7 +79,10 @@ pub struct Agent { pub(crate) lite: bool, pub(crate) start_time: Instant, + pub(crate) nominated_pair: Option, + pub(crate) selected_pair: Option, + pub(crate) checklist: Vec, pub(crate) connection_state: ConnectionState, @@ -94,8 +95,6 @@ pub struct Agent { // LRU of outbound Binding request Transaction IDs pub(crate) pending_binding_requests: Vec, - pub(crate) agent_conn: AgentConn, - // the following variables won't be changed after init_with_defaults() pub(crate) insecure_skip_verify: bool, pub(crate) max_binding_requests: u16, @@ -146,7 +145,10 @@ impl Agent { lite: config.lite, start_time: Instant::now(), + nominated_pair: None, + selected_pair: None, + checklist: vec![], connection_state: ConnectionState::New, @@ -207,9 +209,6 @@ impl Agent { // LRU of outbound Binding request Transaction IDs pending_binding_requests: vec![], - // AgentConn - agent_conn: AgentConn::new(), - candidate_types, urls: config.urls.clone(), }; @@ -297,7 +296,7 @@ impl Agent { /// Returns the selected pair or none if there is none pub fn get_selected_candidate_pair(&self) -> Option { //TODO: - self.agent_conn.get_selected_pair() + self.get_selected_pair() } /// Sets the credentials of the remote agent. @@ -342,7 +341,7 @@ impl Agent { self.pending_binding_requests = vec![]; - self.agent_conn.checklist = vec![]; + self.checklist = vec![]; self.set_selected_pair(None); self.delete_all_candidates(); @@ -516,12 +515,12 @@ impl Agent { log::trace!( "[{}]: Set selected candidate pair: {:?}", self.get_name(), - self.agent_conn.checklist[index] + self.checklist[index] ); - let p = &mut self.agent_conn.checklist[index]; + let p = &mut self.checklist[index]; p.nominated = true; - self.agent_conn.selected_pair = Some(index); + self.selected_pair = Some(index); self.update_connection_state(ConnectionState::Connected); @@ -539,7 +538,7 @@ impl Agent { on_connected_tx.take(); }*/ } else { - self.agent_conn.selected_pair = None; + self.selected_pair = None; } } @@ -550,7 +549,7 @@ impl Agent { { let name = self.get_name().to_string(); - let checklist = &mut self.agent_conn.checklist; + let checklist = &mut self.checklist; if checklist.is_empty() { log::warn!( "[{}]: pingAllCandidates called with no candidate pairs. Connection is not possible yet.", @@ -593,11 +592,11 @@ impl Agent { self.remote_candidates[remote].priority(), self.is_controlling, ); - self.agent_conn.checklist.push(p); + self.checklist.push(p); } pub(crate) fn find_pair(&self, local: usize, remote: usize) -> Option { - let checklist = &self.agent_conn.checklist; + let checklist = &self.checklist; for (index, p) in checklist.iter().enumerate() { if p.local == local && p.remote == remote { return Some(index); @@ -610,10 +609,10 @@ impl Agent { /// Note: the caller should hold the agent lock. pub(crate) fn validate_selected_pair(&mut self) -> bool { let (valid, disconnected_time) = { - self.agent_conn.selected_pair.as_ref().map_or_else( + self.selected_pair.as_ref().map_or_else( || (false, Duration::from_secs(0)), |selected_pair| { - let remote = self.agent_conn.checklist[*selected_pair].remote; + let remote = self.checklist[*selected_pair].remote; let disconnected_time = Instant::now() .duration_since(self.remote_candidates[remote].last_received()); @@ -649,11 +648,10 @@ impl Agent { /// Note: the caller should hold the agent lock. pub(crate) fn check_keepalive(&mut self) { let (local, remote) = { - self.agent_conn - .selected_pair + self.selected_pair .as_ref() .map_or((None, None), |selected_pair| { - let p = &self.agent_conn.checklist[*selected_pair]; + let p = &self.checklist[*selected_pair]; (Some(p.local), Some(p.remote)) }) }; @@ -1148,7 +1146,7 @@ impl Agent { self.get_name(), //c.addr().await //from {} ); - } /*TODO: else if let Err(err) = self.agent_conn.buffer.write(buf).await { + } /*TODO: else if let Err(err) = self.buffer.write(buf).await { // NOTE This will return packetio.ErrFull if the buffer ever manages to fill up. log::warn!("[{}]: failed to write packet: {}", self.get_name(), err); }*/