diff --git a/interceptor/CHANGELOG.md b/interceptor/CHANGELOG.md index a24e19d67..a17e6b688 100644 --- a/interceptor/CHANGELOG.md +++ b/interceptor/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +* Add a `max_age` property to the NACK responder interceptor. When configured with `ResponderBuilder::with_max_packet_age` packets that were originally sent more than `max_packet_age` ago will not be resent, even when requested. [#313](https://github.com/webrtc-rs/webrtc/pull/313) by [@k0nserv](https://github.com/k0nserv). + ## v0.8.1 * Further extended stats interceptors to collect stats for `RemoteOutoundRTPStats` and improve `RemoteInboundRTPStats` collection. [#282](https://github.com/webrtc-rs/webrtc/pull/282) by [@k0nserv](https://github.com/k0nserv). @@ -9,7 +11,7 @@ * Don't generate empty TWCC packets that libWebRTC will ignore. [#324](https://github.com/webrtc-rs/webrtc/pull/324) by [@k0nserv](https://github.com/k0nserv). * Increased minimum support rust version to `1.60.0`. * Increased required `webrtc-util` version to `0.7.0`. - + ## v0.8.0 diff --git a/interceptor/src/mock/mock_stream.rs b/interceptor/src/mock/mock_stream.rs index bd1cec881..0ec2f4641 100644 --- a/interceptor/src/mock/mock_stream.rs +++ b/interceptor/src/mock/mock_stream.rs @@ -210,12 +210,21 @@ impl MockStream { last } - /// written_rtp returns a channel containing rtp packets written, modified by the interceptor + /// Wait for a written RTP packet to appear after traversing interceptor chains. pub async fn written_rtp(&self) -> Option { let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await; rtp_out_modified_rx.recv().await } + /// Assert that a RTP packet has traversed interceptor chains. + /// + /// Like [`writte_rtp`] but does not wait. + pub async fn written_rtp_expected(&self) -> Option { + let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await; + + rtp_out_modified_rx.try_recv().ok() + } + /// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor pub async fn read_rtcp( &self, diff --git a/interceptor/src/nack/responder/mod.rs b/interceptor/src/nack/responder/mod.rs index 48bbfbf85..73ef9670f 100644 --- a/interceptor/src/nack/responder/mod.rs +++ b/interceptor/src/nack/responder/mod.rs @@ -17,12 +17,14 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use tokio::sync::Mutex; /// GeneratorBuilder can be used to configure Responder Interceptor #[derive(Default)] pub struct ResponderBuilder { log2_size: Option, + max_packet_age: Option, } impl ResponderBuilder { @@ -32,6 +34,15 @@ impl ResponderBuilder { self.log2_size = Some(log2_size); self } + + /// with_max_packet_age sets the max age of packets that will be resent. + /// + /// When a resend is requested, packets that were first sent more than `max_packet_age` ago + /// will not be resent. + pub fn with_max_packet_age(mut self, max_packet_age: Duration) -> ResponderBuilder { + self.max_packet_age = Some(max_packet_age); + self + } } impl InterceptorBuilder for ResponderBuilder { @@ -43,6 +54,7 @@ impl InterceptorBuilder for ResponderBuilder { } else { 13 // 8192 = 1 << 13 }, + max_packet_age: self.max_packet_age, streams: Arc::new(Mutex::new(HashMap::new())), }), })) @@ -51,6 +63,7 @@ impl InterceptorBuilder for ResponderBuilder { pub struct ResponderInternal { log2_size: u8, + max_packet_age: Option, streams: Arc>>>, } @@ -58,6 +71,7 @@ impl ResponderInternal { async fn resend_packets( streams: Arc>>>, nack: TransportLayerNack, + max_packet_age: Option, ) { let stream = { let m = streams.lock().await; @@ -73,14 +87,28 @@ impl ResponderInternal { n.range(Box::new( move |seq: u16| -> Pin + Send + 'static>> { let stream3 = Arc::clone(&stream2); + Box::pin(async move { - if let Some(p) = stream3.get(seq).await { - let a = Attributes::new(); - if let Err(err) = stream3.next_rtp_writer.write(&p, &a).await { - log::warn!("failed resending nacked packet: {}", err); + let p = match stream3.get(seq).await { + None => return true, + Some(p) => p, + }; + + if let Some(max_packet_age) = max_packet_age { + let packet_age = p.age(); + let should_send = packet_age < max_packet_age; + if !should_send { + log::debug!("Not resending packet {} as it's older than the configured max age {}s. Packet was initially sent {}s ago", p.packet.header.sequence_number, max_packet_age.as_secs_f64(), packet_age.as_secs_f64()); + return true; } } + + let a = Attributes::new(); + if let Err(err) = stream3.next_rtp_writer.write(&p.packet, &a).await { + log::warn!("failed resending nacked packet: {}", err); + } + true }) }, @@ -92,6 +120,7 @@ impl ResponderInternal { pub struct ResponderRtcpReader { parent_rtcp_reader: Arc, + max_packet_age: Option, internal: Arc, } @@ -106,8 +135,9 @@ impl RTCPReader for ResponderRtcpReader { if let Some(nack) = p.as_any().downcast_ref::() { let nack = nack.clone(); let streams = Arc::clone(&self.internal.streams); + let max_packet_age = self.max_packet_age; tokio::spawn(async move { - ResponderInternal::resend_packets(streams, nack).await; + ResponderInternal::resend_packets(streams, nack, max_packet_age).await; }); } } @@ -138,6 +168,7 @@ impl Interceptor for Responder { ) -> Arc { Arc::new(ResponderRtcpReader { internal: Arc::clone(&self.internal), + max_packet_age: self.internal.max_packet_age, parent_rtcp_reader: reader, }) as Arc } diff --git a/interceptor/src/nack/responder/responder_stream.rs b/interceptor/src/nack/responder/responder_stream.rs index 86e022162..6619e1ad6 100644 --- a/interceptor/src/nack/responder/responder_stream.rs +++ b/interceptor/src/nack/responder/responder_stream.rs @@ -2,12 +2,15 @@ use crate::error::Result; use crate::nack::UINT16SIZE_HALF; use crate::{Attributes, RTPWriter}; -use async_trait::async_trait; use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; use tokio::sync::Mutex; +use tokio::time::Instant; struct ResponderStreamInternal { - packets: Vec>, + packets: Vec>, size: u16, last_added: u16, started: bool, @@ -26,7 +29,7 @@ impl ResponderStreamInternal { fn add(&mut self, packet: &rtp::packet::Packet) { let seq = packet.header.sequence_number; if !self.started { - self.packets[(seq % self.size) as usize] = Some(packet.clone()); + self.packets[(seq % self.size) as usize] = Some(packet.clone().into()); self.last_added = seq; self.started = true; return; @@ -43,11 +46,11 @@ impl ResponderStreamInternal { } } - self.packets[(seq % self.size) as usize] = Some(packet.clone()); + self.packets[(seq % self.size) as usize] = Some(packet.clone().into()); self.last_added = seq; } - fn get(&self, seq: u16) -> Option<&rtp::packet::Packet> { + fn get(&self, seq: u16) -> Option<&SentPacket> { let diff = self.last_added.wrapping_sub(seq); if diff >= UINT16SIZE_HALF { return None; @@ -79,7 +82,7 @@ impl ResponderStream { internal.add(pkt); } - pub(super) async fn get(&self, seq: u16) -> Option { + pub(super) async fn get(&self, seq: u16) -> Option { let internal = self.internal.lock().await; internal.get(seq).cloned() } @@ -96,6 +99,29 @@ impl RTPWriter for ResponderStream { } } +#[derive(Clone)] +/// A packet that has been sent, or at least been queued to send. +pub struct SentPacket { + pub(super) packet: rtp::packet::Packet, + // We use tokio's instant because it's mockable. + sent_at: Instant, +} + +impl SentPacket { + pub(super) fn age(&self) -> Duration { + self.sent_at.elapsed() + } +} + +impl From for SentPacket { + fn from(packet: rtp::packet::Packet) -> Self { + Self { + packet, + sent_at: Instant::now(), + } + } +} + #[cfg(test)] mod test { use super::*; @@ -127,9 +153,9 @@ mod test { let seq = start.wrapping_add(*n); if let Some(packet) = sb.get(seq) { assert_eq!( - packet.header.sequence_number, seq, + packet.packet.header.sequence_number, seq, "packet for {} returned with incorrect SequenceNumber: {}", - seq, packet.header.sequence_number + seq, packet.packet.header.sequence_number ); } else { assert!(false, "packet not found: {}", seq); @@ -144,7 +170,7 @@ mod test { assert!( false, "packet found for {}: {}", - seq, packet.header.sequence_number + seq, packet.packet.header.sequence_number ); } } diff --git a/interceptor/src/nack/responder/responder_test.rs b/interceptor/src/nack/responder/responder_test.rs index b8f308cf7..e93d3ce73 100644 --- a/interceptor/src/nack/responder/responder_test.rs +++ b/interceptor/src/nack/responder/responder_test.rs @@ -1,12 +1,11 @@ use super::*; use crate::mock::mock_stream::MockStream; use crate::stream_info::RTCPFeedback; -use crate::test::timeout_or_fail; use tokio::time::Duration; use rtcp::transport_feedbacks::transport_layer_nack::{NackPair, TransportLayerNack}; -#[tokio::test] +#[tokio::test(start_paused = true)] async fn test_responder_interceptor() -> Result<()> { let icpr: Arc = Responder::builder().with_log2_size(3).build("")?; @@ -35,9 +34,13 @@ async fn test_responder_interceptor() -> Result<()> { }) .await?; - let p = timeout_or_fail(Duration::from_millis(10), stream.written_rtp()) + // Let the packet be pulled through interceptor chains + tokio::task::yield_now().await; + + let p = stream + .written_rtp_expected() .await - .expect("A packet"); + .expect("Packet should have been written"); assert_eq!(seq_num, p.header.sequence_number); } @@ -53,24 +56,97 @@ async fn test_responder_interceptor() -> Result<()> { ], })]) .await; + tokio::time::advance(Duration::from_millis(50)).await; + // Let the NACK task do its thing + tokio::task::yield_now().await; // seq number 13 was never sent, so it can't be resent for seq_num in [11, 12, 15] { - if let Ok(r) = tokio::time::timeout(Duration::from_millis(50), stream.written_rtp()).await { - if let Some(p) = r { - assert_eq!(seq_num, p.header.sequence_number); - } else { - assert!( - false, - "seq_num {} is not sent due to channel closed", - seq_num - ); - } + let p = stream + .written_rtp_expected() + .await + .expect("Packet should have been written"); + assert_eq!(seq_num, p.header.sequence_number); + } + + let result = stream.written_rtp_expected().await; + assert!(result.is_none(), "no more rtp packets expected"); + + stream.close().await?; + + Ok(()) +} + +#[tokio::test(start_paused = true)] +async fn test_responder_interceptor_with_max_age() -> Result<()> { + let icpr: Arc = Responder::builder() + .with_log2_size(3) + .with_max_packet_age(Duration::from_millis(400)) + .build("")?; + + let stream = MockStream::new( + &StreamInfo { + ssrc: 1, + rtcp_feedback: vec![RTCPFeedback { + typ: "nack".to_owned(), + ..Default::default() + }], + ..Default::default() + }, + icpr, + ) + .await; + + for seq_num in [10, 11, 12, 14, 15] { + stream + .write_rtp(&rtp::packet::Packet { + header: rtp::header::Header { + sequence_number: seq_num, + ..Default::default() + }, + ..Default::default() + }) + .await?; + tokio::time::advance(Duration::from_millis(30)).await; + tokio::task::yield_now().await; + + let p = stream.written_rtp().await.expect("A packet"); + assert_eq!(seq_num, p.header.sequence_number); + } + + // Advance time 300ms. Packets 10 and 11 will now have been sent 450ms and 420ms ago + // respectively. + tokio::time::advance(Duration::from_millis(300)).await; + + stream + .receive_rtcp(vec![Box::new(TransportLayerNack { + media_ssrc: 1, + sender_ssrc: 2, + nacks: vec![ + NackPair { + packet_id: 10, + lost_packets: 0b10111, + }, // sequence numbers: 11, 12, 13, 15 + ], + })]) + .await; + tokio::task::yield_now().await; + + // seq number 13 was never sent and seq number 10 and 11 is too late to resend now. + for seq_num in [12, 15] { + if let Some(p) = stream.written_rtp().await { + assert_eq!(seq_num, p.header.sequence_number); } else { - assert!(false, "seq_num {} is not sent yet", seq_num); + assert!( + false, + "seq_num {} is not sent due to channel closed", + seq_num + ); } } + // Resume time + tokio::time::resume(); let result = tokio::time::timeout(Duration::from_millis(10), stream.written_rtp()).await; assert!(result.is_err(), "no more rtp packets expected");